From 8854d9f05390d3c22e2c1d1970b410437d2004ca Mon Sep 17 00:00:00 2001 From: xuchaojie Date: Thu, 10 Feb 2022 11:42:35 +0800 Subject: [PATCH] curvefs client : fix bug of getleader always fails causes stack overflow --- .../client/rpcclient/metaserver_client.cpp | 22 +++++-- curvefs/src/client/rpcclient/task_excutor.cpp | 57 +++++++++-------- curvefs/src/client/rpcclient/task_excutor.h | 2 +- .../rpcclient/metaserver_client_test.cpp | 63 +++++++++++++++++++ .../rpcclient/mock_metaserver_service.h | 6 ++ 5 files changed, 119 insertions(+), 31 deletions(-) diff --git a/curvefs/src/client/rpcclient/metaserver_client.cpp b/curvefs/src/client/rpcclient/metaserver_client.cpp index 4e72fa7c7d..6965f3379c 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.cpp +++ b/curvefs/src/client/rpcclient/metaserver_client.cpp @@ -586,8 +586,15 @@ void MetaServerClientImpl::UpdateInodeAsync(const Inode &inode, MetaServerOpType::UpdateInode, task, inode.fsid(), inode.inodeid()); auto excutor = std::make_shared(opt_, metaCache_, channelManager_, taskCtx); - TaskExecutorDone *taskDone = new TaskExecutorDone(excutor, done); - excutor->DoAsyncRPCTask(taskDone); + TaskExecutorDone *taskDone = new TaskExecutorDone( + excutor, done); + brpc::ClosureGuard taskDone_guard(taskDone); + int ret = excutor->DoAsyncRPCTask(taskDone); + if (ret < 0) { + taskDone->SetRetCode(ret); + return; + } + taskDone_guard.release(); } MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo( @@ -741,8 +748,15 @@ void MetaServerClientImpl::GetOrModifyS3ChunkInfoAsync( MetaServerOpType::GetOrModifyS3ChunkInfo, task, fsId, inodeId); auto excutor = std::make_shared(opt_, metaCache_, channelManager_, taskCtx); - TaskExecutorDone *taskDone = new TaskExecutorDone(excutor, done); - excutor->DoAsyncRPCTask(taskDone); + TaskExecutorDone *taskDone = new TaskExecutorDone( + excutor, done); + brpc::ClosureGuard taskDone_guard(taskDone); + int ret = excutor->DoAsyncRPCTask(taskDone); + if (ret < 0) { + taskDone->SetRetCode(ret); + return; + } + taskDone_guard.release(); } MetaStatusCode MetaServerClientImpl::CreateInode(const InodeParam ¶m, diff --git a/curvefs/src/client/rpcclient/task_excutor.cpp b/curvefs/src/client/rpcclient/task_excutor.cpp index 0bd7b7b61b..d8e37a7f06 100644 --- a/curvefs/src/client/rpcclient/task_excutor.cpp +++ b/curvefs/src/client/rpcclient/task_excutor.cpp @@ -79,37 +79,38 @@ int TaskExecutor::DoRPCTask() { return retCode; } -void TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) { - brpc::ClosureGuard done_guard(done); +int TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) { task_->rpcTimeoutMs = opt_.rpcTimeoutMS; + int retCode = -1; - if (task_->retryTimes++ > opt_.maxRetry) { - LOG(ERROR) << task_->TaskContextStr() - << " retry times exceeds the limit"; - done->SetRetCode(retCode); - return; - } + do { + if (task_->retryTimes++ > opt_.maxRetry) { + LOG(ERROR) << task_->TaskContextStr() + << " retry times exceeds the limit"; + break; + } - if (!HasValidTarget() && !GetTarget()) { - LOG(WARNING) << "get target fail for " << task_->TaskContextStr() - << ", sleep and retry"; - done->SetRetCode(retCode); - return; - } + if (!HasValidTarget() && !GetTarget()) { + LOG(WARNING) << "get target fail for " << task_->TaskContextStr() + << ", sleep and retry"; + bthread_usleep(opt_.retryIntervalUS); + continue; + } - auto channel = channelManager_->GetOrCreateChannel( - task_->target.metaServerID, task_->target.endPoint); - if (!channel) { - LOG(WARNING) << "GetOrCreateChannel fail for " - << task_->TaskContextStr() << ", sleep and retry"; - done->SetRetCode(retCode); - return; - } + auto channel = channelManager_->GetOrCreateChannel( + task_->target.metaServerID, task_->target.endPoint); + if (!channel) { + LOG(WARNING) << "GetOrCreateChannel fail for " + << task_->TaskContextStr() << ", sleep and retry"; + bthread_usleep(opt_.retryIntervalUS); + continue; + } + retCode = ExcuteTask(channel.get(), done); + break; + } while (true); - ExcuteTask(channel.get(), done); - done_guard.release(); - return; + return retCode; } bool TaskExecutor::OnReturn(int retCode) { @@ -303,7 +304,11 @@ void TaskExecutorDone::Run() { needRetry = excutor_->OnReturn(code_); if (needRetry) { excutor_->PreProcessBeforeRetry(code_); - excutor_->DoAsyncRPCTask(this); + code_ = excutor_->DoAsyncRPCTask(this); + if (code_ < 0) { + done_->SetMetaStatusCode(ConvertToMetaStatusCode(code_)); + return; + } self_guard.release(); done_guard.release(); } else { diff --git a/curvefs/src/client/rpcclient/task_excutor.h b/curvefs/src/client/rpcclient/task_excutor.h index 17157690ef..0849f95356 100644 --- a/curvefs/src/client/rpcclient/task_excutor.h +++ b/curvefs/src/client/rpcclient/task_excutor.h @@ -106,7 +106,7 @@ class TaskExecutor { } int DoRPCTask(); - void DoAsyncRPCTask(TaskExecutorDone *done); + int DoAsyncRPCTask(TaskExecutorDone *done); bool OnReturn(int retCode); void PreProcessBeforeRetry(int retCode); diff --git a/curvefs/test/client/rpcclient/metaserver_client_test.cpp b/curvefs/test/client/rpcclient/metaserver_client_test.cpp index 260092f9a0..a0a469e465 100644 --- a/curvefs/test/client/rpcclient/metaserver_client_test.cpp +++ b/curvefs/test/client/rpcclient/metaserver_client_test.cpp @@ -749,6 +749,69 @@ TEST_F(MetaServerClientImplTest, test_UpdateInode) { ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); } +TEST_F(MetaServerClientImplTest, test_GetOrModifyS3ChunkInfo) { + uint32_t fsId = 1; + uint64_t inodeId = 100; + google::protobuf::Map< + uint64_t, S3ChunkInfoList> s3ChunkInfos; + bool returnS3ChunkInfoMap = true; + google::protobuf::Map< + uint64_t, S3ChunkInfoList> out; + uint64_t applyIndex = 10; + + // test1: success + curvefs::metaserver::GetOrModifyS3ChunkInfoResponse response; + response.set_statuscode(curvefs::metaserver::OK); + response.set_appliedindex(applyIndex); + EXPECT_CALL(mockMetaServerService_, GetOrModifyS3ChunkInfo(_, _, _, _)) + .WillOnce(DoAll( + SetArgPointee<2>(response), + Invoke(SetRpcService< + curvefs::metaserver::GetOrModifyS3ChunkInfoRequest, + curvefs::metaserver::GetOrModifyS3ChunkInfoResponse>))); + EXPECT_CALL(*mockMetacache_.get(), GetTarget(_, _, _, _, _)) + .WillRepeatedly(DoAll(SetArgPointee<2>(target_), + SetArgPointee<3>(applyIndex), Return(true))); + EXPECT_CALL(*mockMetacache_.get(), UpdateApplyIndex(_, _)); + + MetaStatusCode status = metaserverCli_.GetOrModifyS3ChunkInfo( + fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out); + + ASSERT_EQ(MetaStatusCode::OK, status); + + // test2: overload + response.set_statuscode(curvefs::metaserver::OVERLOAD); + EXPECT_CALL(mockMetaServerService_, GetOrModifyS3ChunkInfo(_, _, _, _)) + .WillRepeatedly(DoAll( + SetArgPointee<2>(response), + Invoke(SetRpcService< + curvefs::metaserver::GetOrModifyS3ChunkInfoRequest, + curvefs::metaserver::GetOrModifyS3ChunkInfoResponse>))); + status = metaserverCli_.GetOrModifyS3ChunkInfo( + fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out); + ASSERT_EQ(MetaStatusCode::OVERLOAD, status); + + // test3: has no applyIndex + response.set_statuscode(curvefs::metaserver::OK); + response.clear_appliedindex(); + EXPECT_CALL(mockMetaServerService_, GetOrModifyS3ChunkInfo(_, _, _, _)) + .WillRepeatedly(DoAll( + SetArgPointee<2>(response), + Invoke(SetRpcService< + curvefs::metaserver::GetOrModifyS3ChunkInfoRequest, + curvefs::metaserver::GetOrModifyS3ChunkInfoResponse>))); + status = metaserverCli_.GetOrModifyS3ChunkInfo( + fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out); + ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); + + // test4: get target always fail + EXPECT_CALL(*mockMetacache_.get(), GetTarget(_, _, _, _, _)) + .WillRepeatedly(Return(false)); + status = metaserverCli_.GetOrModifyS3ChunkInfo( + fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out); + ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); +} + TEST_F(MetaServerClientImplTest, test_CreateInode) { // in InodeParam inode; diff --git a/curvefs/test/client/rpcclient/mock_metaserver_service.h b/curvefs/test/client/rpcclient/mock_metaserver_service.h index db20ea691b..6990c09705 100644 --- a/curvefs/test/client/rpcclient/mock_metaserver_service.h +++ b/curvefs/test/client/rpcclient/mock_metaserver_service.h @@ -83,6 +83,12 @@ class MockMetaServerService : public curvefs::metaserver::MetaServerService { const ::curvefs::metaserver::DeleteInodeRequest *request, ::curvefs::metaserver::DeleteInodeResponse *response, ::google::protobuf::Closure *done)); + + MOCK_METHOD4(GetOrModifyS3ChunkInfo, + void(::google::protobuf::RpcController *controller, + const ::curvefs::metaserver::GetOrModifyS3ChunkInfoRequest *request, + ::curvefs::metaserver::GetOrModifyS3ChunkInfoResponse *response, + ::google::protobuf::Closure *done)); }; } // namespace rpcclient } // namespace client