Skip to content

Commit

Permalink
curvefs client : fix bug of getleader always fails causes stack overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
xu-chaojie committed Feb 10, 2022
1 parent 5e0125e commit 9186ceb
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 31 deletions.
22 changes: 18 additions & 4 deletions curvefs/src/client/rpcclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,15 @@ void MetaServerClientImpl::UpdateInodeAsync(const Inode &inode,
MetaServerOpType::UpdateInode, task, inode.fsid(), inode.inodeid());
auto excutor = std::make_shared<UpdateInodeExcutor>(opt_,
metaCache_, channelManager_, taskCtx);
TaskExecutorDone *taskDone = new TaskExecutorDone(excutor, done);
excutor->DoAsyncRPCTask(taskDone);
TaskExecutorDone *taskDone = new TaskExecutorDone(
excutor, done);
brpc::ClosureGuard taskDone_guard(taskDone);
int ret = excutor->DoAsyncRPCTask(taskDone);
if (ret < 0) {
taskDone->SetRetCode(ret);
return;
}
taskDone_guard.release();
}

MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo(
Expand Down Expand Up @@ -741,8 +748,15 @@ void MetaServerClientImpl::GetOrModifyS3ChunkInfoAsync(
MetaServerOpType::GetOrModifyS3ChunkInfo, task, fsId, inodeId);
auto excutor = std::make_shared<GetOrModifyS3ChunkInfoExcutor>(opt_,
metaCache_, channelManager_, taskCtx);
TaskExecutorDone *taskDone = new TaskExecutorDone(excutor, done);
excutor->DoAsyncRPCTask(taskDone);
TaskExecutorDone *taskDone = new TaskExecutorDone(
excutor, done);
brpc::ClosureGuard taskDone_guard(taskDone);
int ret = excutor->DoAsyncRPCTask(taskDone);
if (ret < 0) {
taskDone->SetRetCode(ret);
return;
}
taskDone_guard.release();
}

MetaStatusCode MetaServerClientImpl::CreateInode(const InodeParam &param,
Expand Down
57 changes: 31 additions & 26 deletions curvefs/src/client/rpcclient/task_excutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,37 +79,38 @@ int TaskExecutor::DoRPCTask() {
return retCode;
}

void TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
brpc::ClosureGuard done_guard(done);
int TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
task_->rpcTimeoutMs = opt_.rpcTimeoutMS;

int retCode = -1;

if (task_->retryTimes++ > opt_.maxRetry) {
LOG(ERROR) << task_->TaskContextStr()
<< " retry times exceeds the limit";
done->SetRetCode(retCode);
return;
}
do {
if (task_->retryTimes++ > opt_.maxRetry) {
LOG(ERROR) << task_->TaskContextStr()
<< " retry times exceeds the limit";
break;
}

if (!HasValidTarget() && !GetTarget()) {
LOG(WARNING) << "get target fail for " << task_->TaskContextStr()
<< ", sleep and retry";
done->SetRetCode(retCode);
return;
}
if (!HasValidTarget() && !GetTarget()) {
LOG(WARNING) << "get target fail for " << task_->TaskContextStr()
<< ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
continue;
}

auto channel = channelManager_->GetOrCreateChannel(
task_->target.metaServerID, task_->target.endPoint);
if (!channel) {
LOG(WARNING) << "GetOrCreateChannel fail for "
<< task_->TaskContextStr() << ", sleep and retry";
done->SetRetCode(retCode);
return;
}
auto channel = channelManager_->GetOrCreateChannel(
task_->target.metaServerID, task_->target.endPoint);
if (!channel) {
LOG(WARNING) << "GetOrCreateChannel fail for "
<< task_->TaskContextStr() << ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
continue;
}
retCode = ExcuteTask(channel.get(), done);
break;
} while (true);

ExcuteTask(channel.get(), done);
done_guard.release();
return;
return retCode;
}

bool TaskExecutor::OnReturn(int retCode) {
Expand Down Expand Up @@ -303,7 +304,11 @@ void TaskExecutorDone::Run() {
needRetry = excutor_->OnReturn(code_);
if (needRetry) {
excutor_->PreProcessBeforeRetry(code_);
excutor_->DoAsyncRPCTask(this);
code_ = excutor_->DoAsyncRPCTask(this);
if (code_ < 0) {
done_->SetMetaStatusCode(ConvertToMetaStatusCode(code_));
return;
}
self_guard.release();
done_guard.release();
} else {
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/rpcclient/task_excutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TaskExecutor {
}

int DoRPCTask();
void DoAsyncRPCTask(TaskExecutorDone *done);
int DoAsyncRPCTask(TaskExecutorDone *done);

bool OnReturn(int retCode);
void PreProcessBeforeRetry(int retCode);
Expand Down

0 comments on commit 9186ceb

Please sign in to comment.