Skip to content

Commit

Permalink
curvefs/client: fix the data iteration error when rpc retry.
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed May 20, 2022
1 parent d868004 commit 126774c
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 152 deletions.
4 changes: 2 additions & 2 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttr(
return CURVEFS_ERROR::OK;
}

MetaStatusCode ret = metaClient_->BatchGetInodeAttr(fsId_, inodeIds, attr);
MetaStatusCode ret = metaClient_->BatchGetInodeAttr(fsId_, *inodeIds, attr);
if (MetaStatusCode::OK != ret) {
LOG(ERROR) << "metaClient BatchGetInodeAttr failed, MetaStatusCode = "
<< ret << ", MetaStatusCode_Name = "
Expand Down Expand Up @@ -150,7 +150,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetXAttr(
return CURVEFS_ERROR::OK;
}

MetaStatusCode ret = metaClient_->BatchGetXAttr(fsId_, inodeIds, xattr);
MetaStatusCode ret = metaClient_->BatchGetXAttr(fsId_, *inodeIds, xattr);
if (MetaStatusCode::OK != ret) {
LOG(ERROR) << "metaClient BatchGetXAttr failed, MetaStatusCode = "
<< ret << ", MetaStatusCode_Name = "
Expand Down
290 changes: 156 additions & 134 deletions curvefs/src/client/rpcclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,9 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid,
bool GroupInodeIdByPartition(
uint32_t fsId,
std::shared_ptr<MetaCache> metaCache,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::unordered_map<uint32_t, std::list<uint64_t>> *inodeGroups) {
for (const auto &it : *inodeIds) {
for (const auto &it : inodeIds) {
uint32_t pId = 0;
if (metaCache->GetPartitionIdByInodeId(fsId, it, &pId)) {
auto iter = inodeGroups->find(pId);
Expand All @@ -497,163 +497,185 @@ bool GroupInodeIdByPartition(
return true;
}

bool MetaServerClientImpl::SplitRequestInodes(
uint32_t fsId,
const std::set<uint64_t> &inodeIds,
std::vector<std::list<uint64_t>> *inodeGroups) {
std::unordered_map<uint32_t, std::list<uint64_t>> groups;
bool ret = GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &groups);
if (ret) {
for (const auto &it : groups) {
auto iter = it.second.begin();
while (iter != it.second.end()) {
std::list<uint64_t> tmp;
uint32_t batchLimit = opt_.batchLimit;
while (iter != it.second.end() && batchLimit > 0) {
tmp.emplace_back(*iter);
iter++;
batchLimit--;
}
inodeGroups->emplace_back(std::move(tmp));
}
}
return true;
}
return false;
}


MetaStatusCode MetaServerClientImpl::BatchGetInodeAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<InodeAttr> *attr) {
uint32_t limit = opt_.batchLimit;
// group inodeid by partition
std::unordered_map<uint32_t, std::list<uint64_t>> inodeGroups;
if (!GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &inodeGroups)) {
// group inodeid by partition and batchlimit
std::vector<std::list<uint64_t>> inodeGroups;
if (!SplitRequestInodes(fsId, inodeIds, &inodeGroups)) {
return MetaStatusCode::NOT_FOUND;
}

// send rpc
for (const auto &it : inodeGroups) {
auto iter = it.second.begin();
while (iter != it.second.end()) {
uint64_t inodeId = *iter;
auto task = RPCTask {
metaserverClientMetric_->batchGetInodeAttr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetInodeAttr.latency);
BatchGetInodeAttrRequest request;
BatchGetInodeAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(CopysetGroupID(poolID,
copysetID)));
uint32_t batchLimit = limit;
while (iter != it.second.end() && batchLimit > 0) {
request.add_inodeid(*iter);
iter++;
batchLimit--;
}
curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetInodeAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetInodeAttr.eps.count << 1;
LOG(WARNING) << "BatchGetInodeAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}
uint64_t inodeId = *it.begin();
auto task = RPCTask {
metaserverClientMetric_->batchGetInodeAttr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetInodeAttr.latency);
BatchGetInodeAttrRequest request;
BatchGetInodeAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(CopysetGroupID(poolID,
copysetID)));
for (const auto &id : it) {
request.add_inodeid(id);
}
if (request.inodeid_size() <= 0) {
LOG(WARNING) << "BatchGetInodeAttr request empty.";
return MetaStatusCode::PARAM_ERROR;
}

MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND)
<< "BatchGetInodeAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.attr_size() > 0 &&
response.has_appliedindex()) {
auto retAttr = response.attr();
for_each(retAttr.begin(), retAttr.end(),
[&](InodeAttr &a) { attr->push_back(a); });
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetInodeAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetInodeAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetInodeAttr.eps.count << 1;
LOG(WARNING) << "BatchGetInodeAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
attr->clear();
return ret;
LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND)
<< "BatchGetInodeAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.attr_size() > 0 &&
response.has_appliedindex()) {
auto retAttr = response.attr();
for_each(retAttr.begin(), retAttr.end(),
[&](InodeAttr &a) { attr->push_back(a); });
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetInodeAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
if (ret != MetaStatusCode::OK) {
attr->clear();
return ret;
}
}
return MetaStatusCode::OK;
}

MetaStatusCode MetaServerClientImpl::BatchGetXAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<XAttr> *xattr) {
uint32_t limit = opt_.batchLimit;
// group inodeid by partition
std::unordered_map<uint32_t, std::list<uint64_t>> inodeGroups;
if (!GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &inodeGroups)) {
// group inodeid by partition and batchlimit
std::vector<std::list<uint64_t>> inodeGroups;
if (!SplitRequestInodes(fsId, inodeIds, &inodeGroups)) {
return MetaStatusCode::NOT_FOUND;
}

// send rpc
for (const auto &it : inodeGroups) {
auto iter = it.second.begin();
while (iter != it.second.end()) {
uint64_t inodeId = *iter;
auto task = RPCTask {
metaserverClientMetric_->batchGetXattr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetXattr.latency);
BatchGetXAttrRequest request;
BatchGetXAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(
CopysetGroupID(poolID, copysetID)));
uint32_t batchLimit = limit;
while (iter != it.second.end() && batchLimit > 0) {
request.add_inodeid(*iter);
iter++;
batchLimit--;
}
curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetXAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetXattr.eps.count << 1;
LOG(WARNING) << "BatchGetXAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}
uint64_t inodeId = *it.begin();
auto task = RPCTask {
metaserverClientMetric_->batchGetXattr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetXattr.latency);
BatchGetXAttrRequest request;
BatchGetXAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(
CopysetGroupID(poolID, copysetID)));
for (const auto &id : it) {
request.add_inodeid(id);
}
if (request.inodeid_size() <= 0) {
LOG(WARNING) << "BatchGetInodeXAttr request empty.";
return MetaStatusCode::PARAM_ERROR;
}

MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND)
<< "BatchGetXAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.xattr_size() > 0 &&
response.has_appliedindex()) {
auto retXattr = response.xattr();
for_each(retXattr.begin(), retXattr.end(),
[&](XAttr &a) { xattr->push_back(a); });
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetXAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetXAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetXattr.eps.count << 1;
LOG(WARNING) << "BatchGetXAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
xattr->clear();
return ret;
LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND)
<< "BatchGetXAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.xattr_size() > 0 &&
response.has_appliedindex()) {
auto retXattr = response.xattr();
for_each(retXattr.begin(), retXattr.end(),
[&](XAttr &a) { xattr->push_back(a); });
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetXAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
if (ret != MetaStatusCode::OK) {
xattr->clear();
return ret;
}
}
return MetaStatusCode::OK;
Expand Down
16 changes: 12 additions & 4 deletions curvefs/src/client/rpcclient/metaserver_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ class MetaServerClient {
Inode *out, bool* streaming) = 0;

virtual MetaStatusCode BatchGetInodeAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<InodeAttr> *attr) = 0;

virtual MetaStatusCode BatchGetXAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<XAttr> *xattr) = 0;

virtual MetaStatusCode UpdateInode(const Inode &inode,
Expand Down Expand Up @@ -125,6 +125,10 @@ class MetaServerClient {
virtual MetaStatusCode CreateInode(const InodeParam &param, Inode *out) = 0;

virtual MetaStatusCode DeleteInode(uint32_t fsId, uint64_t inodeid) = 0;

virtual bool SplitRequestInodes(uint32_t fsId,
const std::set<uint64_t> &inodeIds,
std::vector<std::list<uint64_t>> *inodeGroups) = 0;
};

class MetaServerClientImpl : public MetaServerClient {
Expand Down Expand Up @@ -162,11 +166,11 @@ class MetaServerClientImpl : public MetaServerClient {
Inode *out, bool* streaming) override;

MetaStatusCode BatchGetInodeAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<InodeAttr> *attr) override;

MetaStatusCode BatchGetXAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<XAttr> *xattr) override;

MetaStatusCode UpdateInode(const Inode &inode,
Expand Down Expand Up @@ -195,6 +199,10 @@ class MetaServerClientImpl : public MetaServerClient {

MetaStatusCode DeleteInode(uint32_t fsId, uint64_t inodeid) override;

bool SplitRequestInodes(uint32_t fsId,
const std::set<uint64_t> &inodeIds,
std::vector<std::list<uint64_t>> *inodeGroups) override;

private:
bool ParseS3MetaStreamBuffer(butil::IOBuf* buffer,
uint64_t* chunkIndex,
Expand Down
Loading

0 comments on commit 126774c

Please sign in to comment.