Skip to content

Commit

Permalink
curvefs: a curve filesystem can be mounted by multi fuse clients
Browse files Browse the repository at this point in the history
NOTES:
Now we implemented close-to-open consistency and multi-mountpoint for one filesystem.
A file written by a client, closed, and then opened by any client can read the latest data.
The scenario where multiple mount points modify the same resource ( Any modification to a
file ) at the same time is not guaranteed.
  • Loading branch information
ilixiaocui committed Feb 28, 2022
1 parent 95dbd19 commit f75daad
Show file tree
Hide file tree
Showing 22 changed files with 451 additions and 301 deletions.
3 changes: 3 additions & 0 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ fuseClient.iCacheLruSize=65536
fuseClient.dCacheLruSize=65536
fuseClient.enableICacheMetrics=true
fuseClient.enableDCacheMetrics=true
fuseClient.cto=true
# FuseOpFlush retry intervel, default is 2s
fuseClient.flushRetryIntervalMs=2000

#### volume
volume.bigFileSize=1048576
Expand Down
10 changes: 8 additions & 2 deletions curvefs/proto/metaserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ message Inode {
optional VolumeExtentList volumeExtentList = 17; // TYPE_FILE only
map<uint64, S3ChunkInfoList> s3ChunkInfoMap = 18; // TYPE_S3 only, first is chunk index
optional uint32 dtime = 19;
optional bool openflag = 20;
optional uint32 openmpcount = 20; // openmpcount mount points had the file open
}

message GetInodeResponse {
Expand Down Expand Up @@ -239,6 +239,12 @@ message CreateRootInodeResponse {
optional uint64 appliedIndex = 2;
}

enum InodeOpenStatusChange {
OPEN = 1;
CLOSE = 2;
NOCHANGE = 3;
}

message UpdateInodeRequest {
required uint32 poolId = 1;
required uint32 copysetId = 2;
Expand All @@ -258,7 +264,7 @@ message UpdateInodeRequest {
optional VolumeExtentList volumeExtentList = 16;
map<uint64, S3ChunkInfoList> s3ChunkInfoMap = 17;
optional uint32 nlink = 18;
optional bool openflag = 19;
optional InodeOpenStatusChange inodeOpenstatusChange = 19;
}

message UpdateInodeResponse {
Expand Down
6 changes: 5 additions & 1 deletion curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ DECLARE_int32(health_check_interval);
namespace curvefs {
namespace client {
namespace common {
DEFINE_bool(enableCto, true, "acheieve cto consistency");

void InitMdsOption(Configuration *conf, MdsOption *mdsOpt) {
conf->GetValueFatalIfFail("mdsOpt.mdsMaxRetryMS", &mdsOpt->mdsMaxRetryMS);
conf->GetValueFatalIfFail("mdsOpt.rpcRetryOpt.maxRPCTimeoutMS",
Expand Down Expand Up @@ -210,7 +212,9 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
&clientOption->enableICacheMetrics);
conf->GetValueFatalIfFail("fuseClient.enableDCacheMetrics",
&clientOption->enableDCacheMetrics);

conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto);
conf->GetValueFatalIfFail("fuseClient.flushRetryIntervalMs",
&clientOption->flushRetryIntervalMS);
conf->GetValueFatalIfFail("client.dummyserver.startport",
&clientOption->dummyServerStartPort);

Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ struct FuseClientOption {
ExtentManagerOption extentManagerOpt;
VolumeOption volumeOpt;

uint32_t flushRetryIntervalMS;
double attrTimeOut;
double entryTimeOut;
uint32_t listDentryLimit;
Expand Down
19 changes: 18 additions & 1 deletion curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ using ::curvefs::mds::FSStatusCode_Name;
} \
} while (0)

namespace curvefs {
namespace client {
namespace common {
DECLARE_bool(enableCto);
} // namespace common
} // namespace client
} // namespace curvefs

namespace curvefs {
namespace client {

Expand Down Expand Up @@ -819,8 +827,9 @@ CURVEFS_ERROR FuseClient::FuseOpReadLink(fuse_req_t req, fuse_ino_t ino,
CURVEFS_ERROR FuseClient::FuseOpRelease(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
LOG(INFO) << "FuseOpRelease, ino: " << ino;
CURVEFS_ERROR ret = CURVEFS_ERROR::OK;
std::shared_ptr<InodeWrapper> inodeWrapper;
CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper);
ret = inodeManager_->GetInode(ino, inodeWrapper);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager get inode fail, ret = " << ret
<< ", inodeid = " << ino;
Expand All @@ -830,6 +839,14 @@ CURVEFS_ERROR FuseClient::FuseOpRelease(fuse_req_t req, fuse_ino_t ino,
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();

ret = inodeWrapper->Release();
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager release inode fail, ret = " << ret
<< ", inodeid = " << ino;
return ret;
}
if (::curvefs::client::common::FLAGS_enableCto) {
// inodeManager_->ClearInodeCache(ino);
}
return ret;
}

Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class FuseClient {
int datasync,
struct fuse_file_info* fi) = 0;

virtual CURVEFS_ERROR FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) = 0;

void SetFsInfo(std::shared_ptr<FsInfo> fsInfo) {
fsInfo_ = fsInfo;
init_ = true;
Expand Down
22 changes: 22 additions & 0 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@
#include <memory>
#include <string>

namespace curvefs {
namespace client {
namespace common {
DECLARE_bool(enableCto);
} // namespace common
} // namespace client
} // namespace curvefs

namespace curvefs {
namespace client {

Expand Down Expand Up @@ -241,6 +249,20 @@ CURVEFS_ERROR FuseS3Client::Truncate(Inode *inode, uint64_t length) {
return s3Adaptor_->Truncate(inode, length);
}

CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
if (curvefs::client::common::FLAGS_enableCto) {
// need retry until success
while (CURVEFS_ERROR::OK != FuseOpFsync(req, ino, 0, fi)) {
sleep(option_.flushRetryIntervalMS / 1000);
}

return CURVEFS_ERROR::OK;
} else {
return FuseOpFsync(req, ino, 0, fi);
}
}

void FuseS3Client::FlushData() {
CURVEFS_ERROR ret = CURVEFS_ERROR::UNKNOWN;
do {
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class FuseS3Client : public FuseClient {
CURVEFS_ERROR FuseOpFsync(fuse_req_t req, fuse_ino_t ino, int datasync,
struct fuse_file_info *fi) override;

CURVEFS_ERROR FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) override;

private:
CURVEFS_ERROR Truncate(Inode *inode, uint64_t length) override;

Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/client/fuse_volume_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ CURVEFS_ERROR FuseVolumeClient::FuseOpFsync(fuse_req_t req, fuse_ino_t ino,
return CURVEFS_ERROR::NOTSUPPORT;
}

CURVEFS_ERROR FuseVolumeClient::FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
// TODO(wuhangqin): implement me
return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR FuseVolumeClient::Truncate(Inode *inode, uint64_t length) {
// Todo: call volume truncate
return CURVEFS_ERROR::OK;
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/client/fuse_volume_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class FuseVolumeClient : public FuseClient {
CURVEFS_ERROR FuseOpFsync(fuse_req_t req, fuse_ino_t ino, int datasync,
struct fuse_file_info *fi) override;

CURVEFS_ERROR FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) override;

private:
CURVEFS_ERROR Truncate(Inode *inode, uint64_t length) override;

Expand Down
36 changes: 10 additions & 26 deletions curvefs/src/client/inode_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void AppendS3ChunkInfoToMap(uint64_t chunkIndex, const S3ChunkInfo &info,

class UpdateInodeAsyncDone : public MetaServerClientDone {
public:
UpdateInodeAsyncDone(
explicit UpdateInodeAsyncDone(
const std::shared_ptr<InodeWrapper> &inodeWrapper):
inodeWrapper_(inodeWrapper) {}
~UpdateInodeAsyncDone() {}
Expand All @@ -92,7 +92,7 @@ class UpdateInodeAsyncDone : public MetaServerClientDone {

class GetOrModifyS3ChunkInfoAsyncDone : public MetaServerClientDone {
public:
GetOrModifyS3ChunkInfoAsyncDone(
explicit GetOrModifyS3ChunkInfoAsyncDone(
const std::shared_ptr<InodeWrapper> &inodeWrapper):
inodeWrapper_(inodeWrapper) {}
~GetOrModifyS3ChunkInfoAsyncDone() {}
Expand Down Expand Up @@ -282,37 +282,21 @@ CURVEFS_ERROR InodeWrapper::DecreaseNLink() {
}

CURVEFS_ERROR InodeWrapper::Open() {
CURVEFS_ERROR ret = CURVEFS_ERROR::OK;
if (0 == openCount_) {
ret = SetOpenFlag(true);
if (ret != CURVEFS_ERROR::OK) {
return ret;
}
}
openCount_++;
return CURVEFS_ERROR::OK;
openFlag_ = true;
return UpdateInodeStatus(InodeOpenStatusChange::OPEN);
}

bool InodeWrapper::IsOpen() { return openCount_ > 0; }
bool InodeWrapper::IsOpen() { return openFlag_; }

CURVEFS_ERROR InodeWrapper::Release() {
CURVEFS_ERROR ret = CURVEFS_ERROR::OK;
if (1 == openCount_) {
ret = SetOpenFlag(false);
if (ret != CURVEFS_ERROR::OK) {
return ret;
}
}
openCount_--;
return CURVEFS_ERROR::OK;
openFlag_ = false;
return UpdateInodeStatus(InodeOpenStatusChange::CLOSE);
}

CURVEFS_ERROR InodeWrapper::SetOpenFlag(bool flag) {
bool old = inode_.openflag();
inode_.set_openflag(flag);
MetaStatusCode ret = metaClient_->UpdateInode(inode_);
CURVEFS_ERROR
InodeWrapper::UpdateInodeStatus(InodeOpenStatusChange statusChange) {
MetaStatusCode ret = metaClient_->UpdateInode(inode_, statusChange);
if (ret != MetaStatusCode::OK) {
inode_.set_openflag(old);
LOG(ERROR) << "metaClient_ UpdateInode failed, MetaStatusCode = " << ret
<< ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
<< ", inodeid = " << inode_.inodeid();
Expand Down
15 changes: 3 additions & 12 deletions curvefs/src/client/inode_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "src/common/concurrent/concurrent.h"

using ::curvefs::metaserver::Inode;
using ::curvefs::metaserver::InodeOpenStatusChange;
using ::curvefs::metaserver::VolumeExtentList;
using ::curvefs::metaserver::S3ChunkInfoList;
using ::curvefs::metaserver::S3ChunkInfo;
Expand Down Expand Up @@ -63,15 +64,13 @@ class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {
InodeWrapper(const Inode &inode,
const std::shared_ptr<MetaServerClient> &metaClient)
: inode_(inode),
openCount_(0),
status_(InodeStatus::Normal),
metaClient_(metaClient),
dirty_(false) {}

InodeWrapper(Inode &&inode,
const std::shared_ptr<MetaServerClient> &metaClient)
: inode_(std::move(inode)),
openCount_(0),
status_(InodeStatus::Normal),
metaClient_(metaClient),
dirty_(false) {}
Expand Down Expand Up @@ -222,14 +221,6 @@ class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {

CURVEFS_ERROR Release();

void SetOpenCount(uint32_t openCount) {
openCount_ = openCount;
}

uint32_t GetOpenCount() const {
return openCount_;
}

void MarkDirty() {
dirty_ = true;
}
Expand Down Expand Up @@ -275,11 +266,11 @@ class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {
}

private:
CURVEFS_ERROR SetOpenFlag(bool flag);
CURVEFS_ERROR UpdateInodeStatus(InodeOpenStatusChange statusChange);

private:
Inode inode_;
uint32_t openCount_;
bool openFlag_;
InodeStatus status_;

google::protobuf::Map<uint64_t, S3ChunkInfoList> s3ChunkInfoAdd_;
Expand Down
13 changes: 8 additions & 5 deletions curvefs/src/client/rpcclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,9 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid,
return ConvertToMetaStatusCode(excutor.DoRPCTask());
}

MetaStatusCode MetaServerClientImpl::UpdateInode(const Inode &inode) {
MetaStatusCode
MetaServerClientImpl::UpdateInode(const Inode &inode,
InodeOpenStatusChange statusChange) {
auto task = RPCTask {
metaserverClientMetric_->updateInode.qps.count << 1;
UpdateInodeResponse response;
Expand All @@ -452,7 +454,7 @@ MetaStatusCode MetaServerClientImpl::UpdateInode(const Inode &inode) {
request.set_gid(inode.gid());
request.set_mode(inode.mode());
request.set_nlink(inode.nlink());
request.set_openflag(inode.openflag());
request.set_inodeopenstatuschange(statusChange);
if (inode.has_volumeextentlist()) {
curvefs::metaserver::VolumeExtentList *vlist =
new curvefs::metaserver::VolumeExtentList;
Expand Down Expand Up @@ -547,8 +549,9 @@ void UpdateInodeRpcDone::Run() {
return;
}

void MetaServerClientImpl::UpdateInodeAsync(const Inode &inode,
MetaServerClientDone *done) {
void MetaServerClientImpl::UpdateInodeAsync(
const Inode &inode, MetaServerClientDone *done,
InodeOpenStatusChange statusChange) {
auto task = AsyncRPCTask {
metaserverClientMetric_->updateInode.qps.count << 1;

Expand All @@ -566,7 +569,7 @@ void MetaServerClientImpl::UpdateInodeAsync(const Inode &inode,
request.set_gid(inode.gid());
request.set_mode(inode.mode());
request.set_nlink(inode.nlink());
request.set_openflag(inode.openflag());
request.set_inodeopenstatuschange(statusChange);
if (inode.has_volumeextentlist()) {
curvefs::metaserver::VolumeExtentList *vlist =
new curvefs::metaserver::VolumeExtentList;
Expand Down
18 changes: 13 additions & 5 deletions curvefs/src/client/rpcclient/metaserver_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using ::curvefs::client::metric::MetaServerClientMetric;
using ::curvefs::metaserver::Dentry;
using ::curvefs::metaserver::FsFileType;
using ::curvefs::metaserver::Inode;
using ::curvefs::metaserver::InodeOpenStatusChange;
using ::curvefs::metaserver::MetaStatusCode;
using ::curvefs::space::AllocateType;
using ::curvefs::metaserver::S3ChunkInfoList;
Expand Down Expand Up @@ -80,10 +81,14 @@ class MetaServerClient {
virtual MetaStatusCode GetInode(uint32_t fsId, uint64_t inodeid,
Inode *out) = 0;

virtual MetaStatusCode UpdateInode(const Inode &inode) = 0;
virtual MetaStatusCode UpdateInode(const Inode &inode,
InodeOpenStatusChange statusChange =
InodeOpenStatusChange::NOCHANGE) = 0;

virtual void UpdateInodeAsync(const Inode &inode,
MetaServerClientDone *done) = 0;
MetaServerClientDone *done,
InodeOpenStatusChange statusChange =
InodeOpenStatusChange::NOCHANGE) = 0;

virtual MetaStatusCode GetOrModifyS3ChunkInfo(
uint32_t fsId, uint64_t inodeId,
Expand Down Expand Up @@ -136,10 +141,13 @@ class MetaServerClientImpl : public MetaServerClient {
MetaStatusCode GetInode(uint32_t fsId, uint64_t inodeid,
Inode *out) override;

MetaStatusCode UpdateInode(const Inode &inode) override;
MetaStatusCode UpdateInode(const Inode &inode,
InodeOpenStatusChange statusChange =
InodeOpenStatusChange::NOCHANGE) override;

void UpdateInodeAsync(const Inode &inode,
MetaServerClientDone *done) override;
void UpdateInodeAsync(const Inode &inode, MetaServerClientDone *done,
InodeOpenStatusChange statusChange =
InodeOpenStatusChange::NOCHANGE) override;

MetaStatusCode GetOrModifyS3ChunkInfo(
uint32_t fsId, uint64_t inodeId,
Expand Down
Loading

0 comments on commit f75daad

Please sign in to comment.