Skip to content

Commit

Permalink
Cherry pick 3.4 (0203-0209) (#5327)
Browse files Browse the repository at this point in the history
* fix meta has not cleanup when reset (#5321)

* support upgrade from V2 to V3.4 (#5319)

Co-authored-by: Sophie <[email protected]>

* fix meta update (#5326)

* fix meta update

* fix bug

---------

Co-authored-by: Sophie <[email protected]>

---------

Co-authored-by: Doodle <[email protected]>
Co-authored-by: hs.zhang <[email protected]>
  • Loading branch information
3 people authored Feb 9, 2023
1 parent 5715cfd commit db3c1b3
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 50 deletions.
31 changes: 18 additions & 13 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,30 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
}

auto version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get());
LOG(INFO) << "Get meta version is " << static_cast<int32_t>(version);
if (version == nebula::meta::MetaVersion::UNKNOWN) {
LOG(ERROR) << "Meta version is invalid";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V1) {
LOG(ERROR) << "Can't upgrade meta from V1 to V3_4";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V2) {
LOG(ERROR) << "Can't upgrade meta from V2 to V3_4";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V3) {
if (!nebula::ok(version)) {
auto ret = nebula::meta::MetaVersionMan::updateMetaV3ToV3_4(engine);
if (!ret.ok()) {
LOG(ERROR) << "Update meta from V3 to V3_4 failed " << ret;
return nullptr;
}
nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V3_4);
} else {
auto v = nebula::value(version);
LOG(INFO) << "Get meta version is " << static_cast<int32_t>(v);
if (v == nebula::meta::MetaVersion::UNKNOWN) {
LOG(ERROR) << "Meta version is invalid";
return nullptr;
} else if (v == nebula::meta::MetaVersion::V1) {
LOG(ERROR) << "Can't upgrade meta from V1 to V3_4";
return nullptr;
} else if (v == nebula::meta::MetaVersion::V2) {
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3_4(engine);
if (!ret.ok()) {
LOG(ERROR) << "Update meta from V2 to V3_4 failed " << ret;
return nullptr;
}
}
}

nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V3_4);
LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId;
return kvstore;
}
Expand Down
12 changes: 12 additions & 0 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "common/time/ScopedTimer.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/MetaKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "common/utils/OperationKeyUtils.h"
#include "common/utils/Utils.h"
Expand Down Expand Up @@ -487,6 +488,9 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, folly:
}

nebula::cpp2::ErrorCode Part::cleanup() {
if (spaceId_ == kDefaultSpaceId && partId_ == kDefaultPartId) {
return metaCleanup();
}
LOG(INFO) << idStr_ << "Clean rocksdb part data";
auto batch = engine_->startBatchWrite();
// Remove the vertex, edge, index, systemCommitKey, operation data under the part
Expand Down Expand Up @@ -557,5 +561,13 @@ nebula::cpp2::ErrorCode Part::cleanup() {
std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true);
}

nebula::cpp2::ErrorCode Part::metaCleanup() {
std::string kMetaPrefix = "__";
auto firstKey = NebulaKeyUtils::firstKey(kMetaPrefix, 1);
auto lastKey = NebulaKeyUtils::lastKey(kMetaPrefix, 1);
// todo(doodle): since the poor performance of DeleteRange, perhaps we need to compact
return engine_->removeRange(firstKey, lastKey);
}

} // namespace kvstore
} // namespace nebula
9 changes: 8 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,19 @@ class Part : public raftex::RaftPart {
TermID committedLogTerm);

/**
* @brief clean up data in listener, called in RaftPart::reset
* @brief clean up data in storage part, called in RaftPart::reset
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode cleanup() override;

/**
* @brief clean up data in meta part, called in RaftPart::reset
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode metaCleanup();

public:
struct CallbackOptions {
GraphSpaceID spaceId;
Expand Down
146 changes: 128 additions & 18 deletions src/meta/MetaVersionMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,19 @@ namespace meta {
static const std::string kMetaVersionKey = "__meta_version__"; // NOLINT

// static
MetaVersion MetaVersionMan::getMetaVersionFromKV(kvstore::KVStore* kv) {
ErrorOr<nebula::cpp2::ErrorCode, MetaVersion> MetaVersionMan::getMetaVersionFromKV(
kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
std::string value;
auto code = kv->get(kDefaultSpaceId, kDefaultPartId, kMetaVersionKey, &value, true);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
auto version = *reinterpret_cast<const MetaVersion*>(value.data());
return version;
} else {
return getVersionByHost(kv);
return code;
}
}

// static
MetaVersion MetaVersionMan::getVersionByHost(kvstore::KVStore* kv) {
const auto& hostPrefix = nebula::MetaKeyUtils::hostPrefix();
std::unique_ptr<nebula::kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, hostPrefix, &iter, true);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return MetaVersion::UNKNOWN;
}
if (iter->valid()) {
auto v1KeySize = hostPrefix.size() + sizeof(int64_t);
return (iter->key().size() == v1KeySize) ? MetaVersion::V1 : MetaVersion::V3_4;
}
// No hosts exists, regard as version 3
return MetaVersion::V3_4;
}

// static
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version) {
CHECK_NOTNULL(engine);
Expand All @@ -58,6 +43,44 @@ bool MetaVersionMan::setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion v
return code == nebula::cpp2::ErrorCode::SUCCEEDED;
}

Status MetaVersionMan::updateMetaV2ToV3_4(kvstore::KVEngine* engine) {
CHECK_NOTNULL(engine);
auto snapshot = folly::sformat("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());

std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) {
LOG(INFO) << "Make checkpoint dir: " << path << " failed";
return Status::Error("Create snapshot file failed");
}

std::string dataPath = folly::sformat("{}/data", path);
auto code = engine->createCheckpoint(dataPath);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}

auto status = doUpgradeV2ToV3(engine);
if (!status.ok()) {
// rollback by snapshot
return status;
}

status = doUpgradeV3ToV3_4(engine);

if (!status.ok()) {
// rollback by snapshot
return status;
}

// delete snapshot file
auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) {
LOG(INFO) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

Status MetaVersionMan::updateMetaV3ToV3_4(kvstore::KVEngine* engine) {
CHECK_NOTNULL(engine);
auto snapshot = folly::sformat("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());
Expand Down Expand Up @@ -89,6 +112,93 @@ Status MetaVersionMan::updateMetaV3ToV3_4(kvstore::KVEngine* engine) {
return Status::OK();
}

Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) {
MetaDataUpgrade upgrader(engine);
// Step 1: Upgrade HeartBeat into machine list
{
// collect all hosts association with zone
std::vector<HostAddr> zoneHosts;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
std::unique_ptr<kvstore::KVIterator> zoneIter;
auto code = engine->prefix(zonePrefix, &zoneIter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Get active hosts failed";
return Status::Error("Get hosts failed");
}

while (zoneIter->valid()) {
auto hosts = MetaKeyUtils::parseZoneHosts(zoneIter->val());
if (!hosts.empty()) {
zoneHosts.insert(zoneHosts.end(), hosts.begin(), hosts.end());
}
zoneIter->next();
}

const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
code = engine->prefix(prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Get active hosts failed";
return Status::Error("Get hosts failed");
}

std::vector<kvstore::KV> data;
while (iter->valid()) {
auto info = HostInfo::decode(iter->val());

if (info.role_ == meta::cpp2::HostRole::STORAGE) {
// Save the machine information
auto host = MetaKeyUtils::parseHostKey(iter->key());
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
data.emplace_back(std::move(machineKey), "");

auto hostIt = std::find(zoneHosts.begin(), zoneHosts.end(), host);
if (hostIt == zoneHosts.end()) {
// Save the zone information
auto zoneName = folly::stringPrintf("default_zone_%s_%d", host.host.c_str(), host.port);
auto zoneKey = MetaKeyUtils::zoneKey(std::move(zoneName));
auto zoneVal = MetaKeyUtils::zoneVal({host});
data.emplace_back(std::move(zoneKey), std::move(zoneVal));
}
}
iter->next();
}
auto status = upgrader.saveMachineAndZone(std::move(data));
if (!status.ok()) {
LOG(INFO) << status;
return status;
}
}

// Step 2: Update Create space properties about Group
{
const auto& prefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = engine->prefix(prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Get spaces failed";
return Status::Error("Get spaces failed");
}

while (iter->valid()) {
if (FLAGS_print_info) {
upgrader.printSpacesV2(iter->val());
}
auto status = upgrader.rewriteSpacesV2ToV3(iter->key(), iter->val());
if (!status.ok()) {
LOG(INFO) << status;
return status;
}
iter->next();
}
}
if (!setMetaVersionToKV(engine, MetaVersion::V3)) {
return Status::Error("Persist meta version failed");
} else {
return Status::OK();
}
}

Status MetaVersionMan::doUpgradeV3ToV3_4(kvstore::KVEngine* engine) {
std::unique_ptr<kvstore::KVIterator> fulltextIter;
auto code = engine->prefix(MetaKeyUtils::fulltextIndexPrefix(), &fulltextIter);
Expand Down
9 changes: 6 additions & 3 deletions src/meta/MetaVersionMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define META_METAVERSIONMAN_H_

#include "common/base/Base.h"
#include "common/base/ErrorOr.h"
#include "common/utils/MetaKeyUtils.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVStore.h"
Expand Down Expand Up @@ -35,16 +36,18 @@ class MetaVersionMan final {
* @param kv
* @return
*/
static MetaVersion getMetaVersionFromKV(kvstore::KVStore* kv);
static ErrorOr<nebula::cpp2::ErrorCode, MetaVersion> getMetaVersionFromKV(kvstore::KVStore* kv);

static bool setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version);

static Status updateMetaV2ToV3_4(kvstore::KVEngine* engine);

static Status updateMetaV3ToV3_4(kvstore::KVEngine* engine);

private:
static MetaVersion getVersionByHost(kvstore::KVStore* kv);

static Status doUpgradeV3ToV3_4(kvstore::KVEngine* engine);

static Status doUpgradeV2ToV3(kvstore::KVEngine* engine);
};

} // namespace meta
Expand Down
7 changes: 6 additions & 1 deletion src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ void HBProcessor::process(const cpp2::HBReq& req) {

auto version = metaVersion_.load();
if (version == -1) {
metaVersion_.store(static_cast<int64_t>(MetaVersionMan::getMetaVersionFromKV(kvstore_)));
auto v = MetaVersionMan::getMetaVersionFromKV(kvstore_);
if (nebula::ok(v)) {
metaVersion_.store(static_cast<int64_t>(nebula::value(v)));
} else {
metaVersion_.store(static_cast<int64_t>(MetaVersion::V3_4));
}
}

resp_.meta_version_ref() = metaVersion_.load();
Expand Down
15 changes: 1 addition & 14 deletions src/tools/meta-dump/MetaDumpTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,7 @@ class MetaDumper {
}

if (!found) {
prefix = MetaKeyUtils::hostPrefix();
iter->Seek(rocksdb::Slice(prefix));
while (iter->Valid() && iter->key().starts_with(prefix)) {
found = true;
auto v1KeySize = prefix.size() + sizeof(int64_t);
auto version = (iter->key().size() == v1KeySize) ? MetaVersion::V1 : MetaVersion::V3_4;
LOG(INFO) << "Meta version=" << static_cast<int>(version);
iter->Next();
break;
}

if (!found) {
LOG(INFO) << "Meta version= Unknown";
}
LOG(INFO) << "Meta version= Unknown";
}
}
{
Expand Down

0 comments on commit db3c1b3

Please sign in to comment.