diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index f91c9a1afdc..cd7e9e42486 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -34,10 +34,10 @@ #include "time_util.h" #include "types/redis_stream_base.h" -const char *errFailedToSendCommands = "failed to send commands to restore a key"; -const char *errMigrationTaskCanceled = "key migration stopped due to a task cancellation"; -const char *errFailedToSetImportStatus = "failed to set import status on destination node"; -const char *errUnsupportedMigrationType = "unsupported migration type"; +constexpr std::string_view errFailedToSendCommands = "failed to send commands to restore a key"; +constexpr std::string_view errMigrationTaskCanceled = "key migration stopped due to a task cancellation"; +constexpr std::string_view errFailedToSetImportStatus = "failed to set import status on destination node"; +constexpr std::string_view errUnsupportedMigrationType = "unsupported migration type"; static std::map type_to_cmd = { {kRedisString, "set"}, {kRedisList, "rpush"}, {kRedisHash, "hmset"}, {kRedisSet, "sadd"}, @@ -318,7 +318,7 @@ Status SlotMigrator::sendSnapshot() { } else if (migration_type_ == MigrationType::kRawKeyValue) { return sendSnapshotByRawKV(); } - return {Status::NotOK, errUnsupportedMigrationType}; + return {Status::NotOK, std::string(errUnsupportedMigrationType)}; } Status SlotMigrator::syncWAL() { @@ -327,7 +327,7 @@ Status SlotMigrator::syncWAL() { } else if (migration_type_ == MigrationType::kRawKeyValue) { return syncWALByRawKV(); } - return {Status::NotOK, errUnsupportedMigrationType}; + return {Status::NotOK, std::string(errUnsupportedMigrationType)}; } Status SlotMigrator::sendSnapshotByCmd() { @@ -337,7 +337,7 @@ Status SlotMigrator::sendSnapshotByCmd() { std::string restore_cmds; SlotRange slot_range = slot_range_; - LOG(INFO) << "[migrate] Start migrating snapshot of slot(s)" << slot_range.String(); + LOG(INFO) << "[migrate] Start migrating snapshot of slot(s): " << slot_range.String(); // Construct key prefix to iterate the keys belong to the target slot std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start); @@ -351,12 +351,12 @@ Status SlotMigrator::sendSnapshotByCmd() { auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle)); // Seek to the beginning of keys start with 'prefix' and iterate all these keys - auto current_slot = slot_range.start; + int current_slot = slot_range.start; for (iter->Seek(prefix); iter->Valid(); iter->Next()) { // The migrating task has to be stopped, if server role is changed from master to slave // or flush command (flushdb or flushall) is executed if (stop_migration_) { - return {Status::NotOK, errMigrationTaskCanceled}; + return {Status::NotOK, std::string(errMigrationTaskCanceled)}; } // Iteration is out of range @@ -366,7 +366,7 @@ Status SlotMigrator::sendSnapshotByCmd() { } // Get user key - auto [_, user_key] = ExtractNamespaceKey(iter->key(), true); + auto [_, user_key] = ExtractNamespaceKey(iter->key(), /*slot_id_encoded=*/true); // Add key's constructed commands to restore_cmds, send pipeline or not according to task's max_pipeline_size auto result = migrateOneKey(user_key, iter->value(), &restore_cmds); @@ -429,7 +429,7 @@ Status SlotMigrator::syncWALByCmd() { Status SlotMigrator::finishSuccessfulMigration() { if (stop_migration_) { - return {Status::NotOK, errMigrationTaskCanceled}; + return {Status::NotOK, std::string(errMigrationTaskCanceled)}; } // Set import status on the destination node to SUCCESS @@ -723,6 +723,12 @@ StatusOr SlotMigrator::migrateOneKey(const rocksdb::Slice &k } break; } + case kRedisHyperLogLog: { + // HyperLogLog migration by cmd is not supported, + // since it's hard to restore the same key structure for HyperLogLog + // commands. + break; + } default: break; } @@ -752,7 +758,17 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds) { std::string cmd; - cmd = type_to_cmd[metadata.Type()]; + { + auto iter = type_to_cmd.find(metadata.Type()); + if (iter != type_to_cmd.end()) { + cmd = iter->second; + } else { + if (metadata.Type() > RedisTypeNames.size()) { + return {Status::NotOK, "unknown key type: " + std::to_string(metadata.Type())}; + } + return {Status::NotOK, "unsupported complex key type: " + RedisTypeNames[metadata.Type()]}; + } + } std::vector user_cmd = {cmd, key.ToString()}; // Construct key prefix to iterate values of the complex type user key @@ -769,7 +785,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) { if (stop_migration_) { - return {Status::NotOK, errMigrationTaskCanceled}; + return {Status::NotOK, std::string(errMigrationTaskCanceled)}; } if (!iter->key().starts_with(prefix_subkey)) { @@ -811,6 +827,9 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata user_cmd.emplace_back(iter->value().ToString()); break; } + case kRedisHyperLogLog: { + break; + } default: break; } @@ -878,7 +897,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) { if (stop_migration_) { - return {Status::NotOK, errMigrationTaskCanceled}; + return {Status::NotOK, std::string(errMigrationTaskCanceled)}; } if (!iter->key().starts_with(prefix_key)) { @@ -964,7 +983,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr< Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) { if (stop_migration_) { - return {Status::NotOK, errMigrationTaskCanceled}; + return {Status::NotOK, std::string(errMigrationTaskCanceled)}; } // Check pipeline diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 179150b84b5..1114f2a1b55 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -35,18 +35,25 @@ #include #include "batch_sender.h" -#include "config.h" #include "encoding.h" #include "parse_util.h" -#include "redis_slot.h" #include "server/server.h" #include "slot_import.h" -#include "stats/stats.h" #include "status.h" #include "storage/redis_db.h" #include "unique_fd.h" -enum class MigrationType { kRedisCommand = 0, kRawKeyValue }; +enum class MigrationType { + /// Use Redis commands to migrate data. + /// It will trying to extract commands from existing data and log, then replay + /// them on the destination node. + kRedisCommand = 0, + /// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data. + /// + /// If downstream is not compatible with raw key-value, this migration type will + /// auto switch to kRedisCommand. + kRawKeyValue +}; enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed }; @@ -111,7 +118,7 @@ class SlotMigrator : public redis::Database { private: void loop(); void runMigrationProcess(); - bool isTerminated() { return thread_state_ == ThreadState::Terminated; } + bool isTerminated() const { return thread_state_ == ThreadState::Terminated; } Status startMigration(); Status sendSnapshot(); Status syncWAL(); @@ -158,11 +165,11 @@ class SlotMigrator : public redis::Database { enum class ParserState { ArrayLen, BulkLen, BulkData, ArrayData, OneRspEnd }; enum class ThreadState { Uninitialized, Running, Terminated }; - static const int kDefaultMaxPipelineSize = 16; - static const int kDefaultMaxMigrationSpeed = 4096; - static const int kDefaultSequenceGapLimit = 10000; - static const int kMaxItemsInCommand = 16; // number of items in every write command of complex keys - static const int kMaxLoopTimes = 10; + static constexpr int kDefaultMaxPipelineSize = 16; + static constexpr int kDefaultMaxMigrationSpeed = 4096; + static constexpr int kDefaultSequenceGapLimit = 10000; + static constexpr int kMaxItemsInCommand = 16; // number of items in every write command of complex keys + static constexpr int kMaxLoopTimes = 10; Server *srv_; @@ -183,7 +190,7 @@ class SlotMigrator : public redis::Database { std::thread t_; std::mutex job_mutex_; std::condition_variable job_cv_; - std::unique_ptr migration_job_; + std::unique_ptr migration_job_; // GUARDED_BY(job_mutex_) std::string dst_node_; std::string dst_ip_; diff --git a/src/stats/stats.h b/src/stats/stats.h index 6fdba09a194..e00506a9672 100644 --- a/src/stats/stats.h +++ b/src/stats/stats.h @@ -41,7 +41,7 @@ enum StatsMetricFlags { STATS_METRIC_COUNT }; -const int STATS_METRIC_SAMPLES = 16; // Number of samples per metric +constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric struct CommandStat { std::atomic calls;