Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
#include "time_util.h"
#include "types/redis_stream_base.h"

const char *errFailedToSendCommands = "failed to send commands to restore a key";
const char *errMigrationTaskCanceled = "key migration stopped due to a task cancellation";
const char *errFailedToSetImportStatus = "failed to set import status on destination node";
const char *errUnsupportedMigrationType = "unsupported migration type";
constexpr std::string_view errFailedToSendCommands = "failed to send commands to restore a key";
constexpr std::string_view errMigrationTaskCanceled = "key migration stopped due to a task cancellation";
constexpr std::string_view errFailedToSetImportStatus = "failed to set import status on destination node";
constexpr std::string_view errUnsupportedMigrationType = "unsupported migration type";

static std::map<RedisType, std::string> type_to_cmd = {
{kRedisString, "set"}, {kRedisList, "rpush"}, {kRedisHash, "hmset"}, {kRedisSet, "sadd"},
Expand Down Expand Up @@ -318,7 +318,7 @@ Status SlotMigrator::sendSnapshot() {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return sendSnapshotByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
return {Status::NotOK, std::string(errUnsupportedMigrationType)};
}

Status SlotMigrator::syncWAL() {
Expand All @@ -327,7 +327,7 @@ Status SlotMigrator::syncWAL() {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return syncWALByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
return {Status::NotOK, std::string(errUnsupportedMigrationType)};
}

Status SlotMigrator::sendSnapshotByCmd() {
Expand All @@ -337,7 +337,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
std::string restore_cmds;
SlotRange slot_range = slot_range_;

LOG(INFO) << "[migrate] Start migrating snapshot of slot(s)" << slot_range.String();
LOG(INFO) << "[migrate] Start migrating snapshot of slot(s): " << slot_range.String();

// Construct key prefix to iterate the keys belong to the target slot
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
Expand All @@ -351,12 +351,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));

// Seek to the beginning of keys start with 'prefix' and iterate all these keys
auto current_slot = slot_range.start;
int current_slot = slot_range.start;
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// The migrating task has to be stopped, if server role is changed from master to slave
// or flush command (flushdb or flushall) is executed
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

// Iteration is out of range
Expand All @@ -366,7 +366,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
}

// Get user key
auto [_, user_key] = ExtractNamespaceKey(iter->key(), true);
auto [_, user_key] = ExtractNamespaceKey(iter->key(), /*slot_id_encoded=*/true);

// Add key's constructed commands to restore_cmds, send pipeline or not according to task's max_pipeline_size
auto result = migrateOneKey(user_key, iter->value(), &restore_cmds);
Expand Down Expand Up @@ -429,7 +429,7 @@ Status SlotMigrator::syncWALByCmd() {

Status SlotMigrator::finishSuccessfulMigration() {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

// Set import status on the destination node to SUCCESS
Expand Down Expand Up @@ -723,6 +723,12 @@ StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &k
}
break;
}
case kRedisHyperLogLog: {
// HyperLogLog migration by cmd is not supported,
// since it's hard to restore the same key structure for HyperLogLog
// commands.
break;
}
default:
break;
}
Expand Down Expand Up @@ -752,7 +758,17 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata

Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds) {
std::string cmd;
cmd = type_to_cmd[metadata.Type()];
{
auto iter = type_to_cmd.find(metadata.Type());
if (iter != type_to_cmd.end()) {
cmd = iter->second;
} else {
if (metadata.Type() > RedisTypeNames.size()) {
return {Status::NotOK, "unknown key type: " + std::to_string(metadata.Type())};
}
return {Status::NotOK, "unsupported complex key type: " + RedisTypeNames[metadata.Type()]};
}
}

std::vector<std::string> user_cmd = {cmd, key.ToString()};
// Construct key prefix to iterate values of the complex type user key
Expand All @@ -769,7 +785,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata

for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

if (!iter->key().starts_with(prefix_subkey)) {
Expand Down Expand Up @@ -811,6 +827,9 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
user_cmd.emplace_back(iter->value().ToString());
break;
}
case kRedisHyperLogLog: {
break;
}
default:
break;
}
Expand Down Expand Up @@ -878,7 +897,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad

for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

if (!iter->key().starts_with(prefix_key)) {
Expand Down Expand Up @@ -964,7 +983,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr<

Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

// Check pipeline
Expand Down
29 changes: 18 additions & 11 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,25 @@
#include <vector>

#include "batch_sender.h"
#include "config.h"
#include "encoding.h"
#include "parse_util.h"
#include "redis_slot.h"
#include "server/server.h"
#include "slot_import.h"
#include "stats/stats.h"
#include "status.h"
#include "storage/redis_db.h"
#include "unique_fd.h"

enum class MigrationType { kRedisCommand = 0, kRawKeyValue };
enum class MigrationType {
/// Use Redis commands to migrate data.
/// It will trying to extract commands from existing data and log, then replay
/// them on the destination node.
kRedisCommand = 0,
/// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
///
/// If downstream is not compatible with raw key-value, this migration type will
/// auto switch to kRedisCommand.
kRawKeyValue
};

enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };

Expand Down Expand Up @@ -111,7 +118,7 @@ class SlotMigrator : public redis::Database {
private:
void loop();
void runMigrationProcess();
bool isTerminated() { return thread_state_ == ThreadState::Terminated; }
bool isTerminated() const { return thread_state_ == ThreadState::Terminated; }
Status startMigration();
Status sendSnapshot();
Status syncWAL();
Expand Down Expand Up @@ -158,11 +165,11 @@ class SlotMigrator : public redis::Database {
enum class ParserState { ArrayLen, BulkLen, BulkData, ArrayData, OneRspEnd };
enum class ThreadState { Uninitialized, Running, Terminated };

static const int kDefaultMaxPipelineSize = 16;
static const int kDefaultMaxMigrationSpeed = 4096;
static const int kDefaultSequenceGapLimit = 10000;
static const int kMaxItemsInCommand = 16; // number of items in every write command of complex keys
static const int kMaxLoopTimes = 10;
static constexpr int kDefaultMaxPipelineSize = 16;
static constexpr int kDefaultMaxMigrationSpeed = 4096;
static constexpr int kDefaultSequenceGapLimit = 10000;
static constexpr int kMaxItemsInCommand = 16; // number of items in every write command of complex keys
static constexpr int kMaxLoopTimes = 10;

Server *srv_;

Expand All @@ -183,7 +190,7 @@ class SlotMigrator : public redis::Database {
std::thread t_;
std::mutex job_mutex_;
std::condition_variable job_cv_;
std::unique_ptr<SlotMigrationJob> migration_job_;
std::unique_ptr<SlotMigrationJob> migration_job_; // GUARDED_BY(job_mutex_)

std::string dst_node_;
std::string dst_ip_;
Expand Down
2 changes: 1 addition & 1 deletion src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum StatsMetricFlags {
STATS_METRIC_COUNT
};

const int STATS_METRIC_SAMPLES = 16; // Number of samples per metric
constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric

struct CommandStat {
std::atomic<uint64_t> calls;
Expand Down
Loading