Skip to content

Commit

Permalink
curvefs/client: optimize DiskCacheManager::UmountDiskCache()
Browse files Browse the repository at this point in the history
Signed-off-by: Ziy1-Tan <[email protected]>
  • Loading branch information
Ziy1-Tan committed Mar 9, 2023
1 parent 87664bd commit 974cdce
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 51 deletions.
14 changes: 7 additions & 7 deletions curvefs/src/client/s3/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ int DiskCacheManager::Init(std::shared_ptr<S3Client> client,
return ret;
}

// start aync upload thread
// start async upload thread
cacheWrite_->AsyncUploadRun();
std::thread uploadThread =
std::thread(&DiskCacheManager::UploadAllCacheWriteFile, this);
Expand Down Expand Up @@ -172,16 +172,16 @@ bool DiskCacheManager::IsCached(const std::string name) {
return true;
}

bool DiskCacheManager::IsCacheClean() {
return cacheWrite_->IsCacheClean();
}

int DiskCacheManager::UmountDiskCache() {
LOG(INFO) << "umount disk cache.";
int ret;
diskInitThread_.join();
ret = cacheWrite_->UploadAllCacheWriteFile();
if (ret < 0) {
LOG(ERROR) << "umount disk cache error.";
}
TrimStop();
cacheWrite_->AsyncUploadStop();
LOG_IF(ERROR, !IsCacheClean()) << "umount disk cache error.";
LOG(INFO) << "umount disk cache end.";
return 0;
}
Expand Down Expand Up @@ -211,7 +211,7 @@ int DiskCacheManager::CreateDir() {
LOG(ERROR) << "create cache read dir error. ret = " << ret;
return ret;
}
VLOG(9) << "create cache dir sucess.";
VLOG(9) << "create cache dir success.";
return 0;
}

Expand Down
9 changes: 7 additions & 2 deletions curvefs/src/client/s3/disk_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class DiskCacheManager {
void InitMetrics(const std::string &fsName);

/**
* @brief: has geted the origin used size or not.
* @brief: has got the origin used size or not.
*/
virtual bool IsDiskUsedInited() {
return diskUsedInit_.load();
Expand Down Expand Up @@ -160,6 +160,11 @@ class DiskCacheManager {
*/
bool IsExceedFileNums();

/**
* @brief check whether cache dir does not exist or there is no cache file
*/
bool IsCacheClean();

curve::common::Thread backEndThread_;
curve::common::Atomic<bool> isRunning_;
curve::common::InterruptibleSleeper sleeper_;
Expand Down Expand Up @@ -188,7 +193,7 @@ class DiskCacheManager {

S3ClientAdaptorOption option_;

// has geted the origin used size or not
// has got the origin used size or not
std::atomic<bool> diskUsedInit_;
curve::common::Thread diskInitThread_;
};
Expand Down
4 changes: 2 additions & 2 deletions curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void DiskCacheManagerImpl::Enqueue(
int DiskCacheManagerImpl::WriteReadDirectClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteReadClosure start, name: " << context->key;
// Write to read cache, we don't care if the cache wirte success
// Write to read cache, we don't care if the cache write success
int ret = WriteReadDirect(context->key,
context->buffer, context->bufferSize);
VLOG(9) << "WriteReadClosure end, name: " << context->key;
Expand Down Expand Up @@ -186,13 +186,13 @@ bool DiskCacheManagerImpl::IsDiskCacheFull() {
}

int DiskCacheManagerImpl::UmountDiskCache() {
taskPool_.Stop();
int ret;
ret = diskCacheManager_->UmountDiskCache();
if (ret < 0) {
LOG(ERROR) << "umount disk cache error.";
return -1;
}
taskPool_.Stop();
client_->Deinit();
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class DiskCacheManagerImpl {
* @brief Write obj
* @param[in] name obj name
* @param[in] buf what to write
* @param[in] length wtite length
* @param[in] length write length
* @return success: write length, fail : < 0
*/
int Write(const std::string name, const char *buf, uint64_t length);
Expand Down
44 changes: 34 additions & 10 deletions curvefs/src/client/s3/disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void DiskCacheWrite::Init(std::shared_ptr<S3Client> client,
}

void DiskCacheWrite::AsyncUploadEnqueue(const std::string objName) {
std::lock_guard<bthread::Mutex> lk(mtx_);
std::lock_guard<std::mutex> lock(mtx_);
waitUpload_.push_back(objName);
}

Expand Down Expand Up @@ -139,7 +139,7 @@ int DiskCacheWrite::UploadFile(const std::string &name,
[&, buffer, syncTask, name]
(const std::shared_ptr<PutObjectAsyncContext> &context) {
if (context->retCode == 0) {
if (metric_.get() != nullptr) {
if (metric_ != nullptr) {
metric_->writeS3.bps.count << context->bufferSize;
metric_->writeS3.qps.count << 1;
metric_->writeS3.latency
Expand Down Expand Up @@ -186,7 +186,7 @@ bool DiskCacheWrite::WriteCacheValid() {

int DiskCacheWrite::GetUploadFile(const std::string &inode,
std::list<std::string> *toUpload) {
std::unique_lock<bthread::Mutex> lk(mtx_);
std::unique_lock<std::mutex> lock(mtx_);
if (waitUpload_.empty()) {
return 0;
}
Expand All @@ -207,11 +207,11 @@ int DiskCacheWrite::GetUploadFile(const std::string &inode,
}

int DiskCacheWrite::FileExist(const std::string &inode) {
// load all write cacahe
// load all write cache
std::set<std::string> cachedObj;
int ret = LoadAllCacheFile(&cachedObj);
if (ret < 0) {
LOG(ERROR) << "DiskCacheWrite, load all cacched file fail ret = "
LOG(ERROR) << "DiskCacheWrite, load all cached file fail ret = "
<< ret;
return ret;
}
Expand Down Expand Up @@ -274,6 +274,7 @@ int DiskCacheWrite::AsyncUploadFunc() {
}

std::list<std::string> toUpload;
std::shared_ptr<SynchronizationTask> syncTask;

VLOG(3) << "async upload function start.";
while (sleeper_.wait_for(std::chrono::milliseconds(asyncLoadPeriodMs_))) {
Expand All @@ -282,13 +283,23 @@ int DiskCacheWrite::AsyncUploadFunc() {
return 0;
}
toUpload.clear();
if (GetUploadFile("", &toUpload) <= 0) {
int num = GetUploadFile("", &toUpload);
if (num <= 0) {
std::unique_lock<std::mutex> lock(mtx_);
if (waitUpload_.empty()) {
cond_.notify_all();
}
continue;
}
VLOG(6) << "async upload file size = " << toUpload.size();
UploadFile(toUpload, nullptr);
VLOG(6) << "async upload file size = " << num;
syncTask.reset(new SynchronizationTask(num));
UploadFile(toUpload, syncTask);
VLOG(6) << "async upload all files";
}

if (syncTask) {
syncTask->Wait();
}
return 0;
}

Expand All @@ -303,15 +314,20 @@ int DiskCacheWrite::AsyncUploadRun() {
}

int DiskCacheWrite::AsyncUploadStop() {
if (isRunning_.load()) {
std::unique_lock<std::mutex> lock(mtx_);
while (!waitUpload_.empty()) {
cond_.wait_for(lock, std::chrono::milliseconds(asyncLoadPeriodMs_));
}
}
if (isRunning_.exchange(false)) {
LOG(INFO) << "stop AsyncUpload thread...";
sleeper_.interrupt();
backEndThread_.join();
LOG(INFO) << "stop AsyncUpload thread ok.";
return -1;
} else {
LOG(INFO) << "AsyncUpload thread not running.";
}
LOG(INFO) << "AsyncUpload thread not running.";
return 0;
}

Expand Down Expand Up @@ -458,5 +474,13 @@ int DiskCacheWrite::WriteDiskFile(const std::string fileName, const char *buf,
return writeLen;
}

bool DiskCacheWrite::IsCacheClean() {
if (!WriteCacheValid()) {
return true;
}
std::set<std::string> objs;
return LoadAllCacheFile(&objs) == 0 && objs.empty();
}

} // namespace client
} // namespace curvefs
24 changes: 15 additions & 9 deletions curvefs/src/client/s3/disk_cache_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ class DiskCacheWrite : public DiskCacheBase {
public:
class SynchronizationTask {
public:
explicit SynchronizationTask(int enventNum) {
countDownEnvent_.Reset(enventNum);
explicit SynchronizationTask(int eventNum) {
countDownEvent_.Reset(eventNum);
errorCount_ = 0;
}
void Wait() { countDownEnvent_.Wait(); }
void Wait() { countDownEvent_.Wait(); }

void Signal() { countDownEnvent_.Signal(); }
void Signal() { countDownEvent_.Signal(); }

void SetError() { errorCount_.fetch_add(1); }

bool Success() { return errorCount_ == 0; }

public:
curve::common::CountDownEvent countDownEnvent_;
curve::common::CountDownEvent countDownEvent_;
std::atomic<int> errorCount_;
};

Expand All @@ -85,7 +85,7 @@ class DiskCacheWrite : public DiskCacheBase {
const std::string cacheDir, uint64_t asyncLoadPeriodMs,
std::shared_ptr<SglLRUCache<std::string>> cachedObjName);
/**
* @brief write obj to write cahce disk
* @brief write obj to write cache disk
* @param[in] client S3Client
* @param[in] option config option
* @return success: 0, fail : < 0
Expand Down Expand Up @@ -116,7 +116,7 @@ class DiskCacheWrite : public DiskCacheBase {
virtual int UploadFileByInode(const std::string &inode);

/**
* @brief: start aync upload thread
* @brief: start async upload thread
*/
virtual int AsyncUploadRun();
/**
Expand All @@ -125,14 +125,19 @@ class DiskCacheWrite : public DiskCacheBase {
*/
virtual void AsyncUploadEnqueue(const std::string objName);
/**
* @brief: stop aync upload thread.
* @brief: stop async upload thread.
*/
virtual int AsyncUploadStop();

virtual void InitMetrics(std::shared_ptr<DiskCacheMetric> metric) {
metric_ = metric;
}

/**
* @brief check that cache dir does not exist or there is no cache file
*/
virtual bool IsCacheClean();

private:
using DiskCacheBase::Init;
int AsyncUploadFunc();
Expand All @@ -146,7 +151,8 @@ class DiskCacheWrite : public DiskCacheBase {
curve::common::Thread backEndThread_;
curve::common::Atomic<bool> isRunning_;
std::list<std::string> waitUpload_;
bthread::Mutex mtx_;
std::mutex mtx_;
std::condition_variable cond_;
InterruptibleSleeper sleeper_;
uint64_t asyncLoadPeriodMs_;
std::shared_ptr<S3Client> client_;
Expand Down
4 changes: 3 additions & 1 deletion curvefs/test/client/mock_disk_cache_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MockDiskCacheWrite : public DiskCacheWrite {
const char* buf, uint64_t length, bool force));

MOCK_METHOD1(CreateIoDir,
int(bool writreDir));
int(bool writeDir));

MOCK_METHOD1(IsFileExist,
bool(const std::string file));
Expand All @@ -66,6 +66,8 @@ class MockDiskCacheWrite : public DiskCacheWrite {
MOCK_METHOD1(AsyncUploadEnqueue,
void(const std::string objName));
MOCK_METHOD1(UploadFileByInode, int(const std::string &inode));

MOCK_METHOD0(IsCacheClean, bool());
};

} // namespace client
Expand Down
12 changes: 0 additions & 12 deletions curvefs/test/client/test_disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ TEST_F(TestDiskCacheManager, TrimRun_1) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
EXPECT_CALL(*wrapper, statfs(NotNull(), NotNull()))
Expand Down Expand Up @@ -312,8 +310,6 @@ TEST_F(TestDiskCacheManager, TrimCache_2) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
Expand Down Expand Up @@ -349,8 +345,6 @@ TEST_F(TestDiskCacheManager, TrimCache_4) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
Expand Down Expand Up @@ -387,8 +381,6 @@ TEST_F(TestDiskCacheManager, TrimCache_5) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
Expand Down Expand Up @@ -428,8 +420,6 @@ TEST_F(TestDiskCacheManager, TrimCache_noexceed) {
.WillOnce(Return(0))
.WillOnce(Return(-1))
.WillOnce(Return(0));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
int ret = diskCacheManager_->TrimRun();
diskCacheManager_->InitMetrics("test");
sleep(6);
Expand Down Expand Up @@ -473,8 +463,6 @@ TEST_F(TestDiskCacheManager, TrimCache_exceed) {
.Times(2)
.WillOnce(Return(-1))
.WillOnce(Return(0));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->TrimRun();
diskCacheManager_->InitMetrics("test");
sleep(6);
Expand Down
6 changes: 2 additions & 4 deletions curvefs/test/client/test_disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,12 @@ TEST_F(TestDiskCacheManagerImpl, IsCached) {
}

TEST_F(TestDiskCacheManagerImpl, UmountDiskCache) {
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, IsCacheClean()).WillOnce(Return(true));
diskCacheManagerImpl_->InitMetrics("test");
int ret = diskCacheManagerImpl_->UmountDiskCache();
ASSERT_EQ(0, ret);

EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
EXPECT_CALL(*diskCacheWrite_, IsCacheClean()).WillOnce(Return(false));
diskCacheManagerImpl_->InitMetrics("test");
ret = diskCacheManagerImpl_->UmountDiskCache();
ASSERT_EQ(0, ret);
Expand Down
11 changes: 8 additions & 3 deletions curvefs/test/client/test_disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,15 @@ TEST_F(TestDiskCacheWrite, AsyncUploadRun) {
sleep(1);
diskCacheWrite_->AsyncUploadEnqueue("test");
std::string t1 = "test";
curve::common::Thread backEndThread =
std::thread(&DiskCacheWrite::AsyncUploadEnqueue, diskCacheWrite_, t1);
std::vector<curve::common::Thread> threads;
for (int i = 0; i < 5; i++) {
threads.emplace_back(&DiskCacheWrite::AsyncUploadEnqueue,
diskCacheWrite_, t1);
}
diskCacheWrite_->AsyncUploadStop();
backEndThread.join();
for (auto &t : threads) {
t.join();
}
}

TEST_F(TestDiskCacheWrite, UploadFileByInode) {
Expand Down

0 comments on commit 974cdce

Please sign in to comment.