Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: list file system in parallel #3769

Merged
merged 18 commits into from
Jun 7, 2023
6 changes: 1 addition & 5 deletions cmd/objbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,11 +588,7 @@ func (bm *benchMarkObj) chtimes(key string, startKey int) error {
}

func listAll(s object.ObjectStorage, prefix, marker string, limit int64) ([]object.Object, error) {
r, err := s.List(prefix, marker, "", limit)
if !errors.Is(err, utils.ENOTSUP) {
return r, err
}
ch, err := s.ListAll(prefix, marker)
ch, err := object.ListAll(s, prefix, marker)
if err == nil {
objs := make([]object.Object, 0)
for obj := range ch {
Expand Down
110 changes: 4 additions & 106 deletions cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/juicedata/juicefs/pkg/vfs"
)

var skipDir syscall.Errno = 100000
var dirSuffix = "/"

func toError(eno syscall.Errno) error {
Expand Down Expand Up @@ -207,7 +206,10 @@ func (j *juiceFS) List(prefix, marker, delimiter string, limit int64) ([]object.
dir := j.path(prefix)
var objs []object.Object
if !strings.HasSuffix(dir, dirSuffix) {
dir = path.Dir(dir) + dirSuffix
dir = path.Dir(dir)
if !strings.HasSuffix(dir, dirSuffix) {
dir += dirSuffix
}
} else if marker == "" {
obj, err := j.Head(prefix)
if err != nil {
Expand Down Expand Up @@ -239,58 +241,6 @@ func (j *juiceFS) List(prefix, marker, delimiter string, limit int64) ([]object.
return objs, nil
}

// walk recursively descends path, calling w.
func (j *juiceFS) walk(path string, info *fs.FileStat, isSymlink bool, walkFn WalkFunc) syscall.Errno {
err := walkFn(path, info, isSymlink, 0)
if err != 0 {
if info.IsDir() && err == skipDir {
return 0
}
return err
}

if !info.IsDir() {
return 0
}

entries, err := j.readDirSorted(path)
if err != 0 {
return walkFn(path, info, isSymlink, err)
}

for _, e := range entries {
p := path + e.name
err = j.walk(p, e.fi, e.isSymlink, walkFn)
if err != 0 && err != skipDir && err != syscall.ENOENT {
return err
}
}
return 0
}

func (j *juiceFS) walkRoot(root string, walkFn WalkFunc) syscall.Errno {
var err syscall.Errno
var lstat, info *fs.FileStat
lstat, err = j.jfs.Lstat(ctx, root)
if err != 0 {
err = walkFn(root, nil, false, err)
} else {
isSymlink := lstat.IsSymlink()
info, err = j.jfs.Stat(ctx, root)
if err != 0 {
// root is a broken link
err = walkFn(root, lstat, isSymlink, 0)
} else {
err = j.walk(root, info, isSymlink, walkFn)
}
}

if err == skipDir {
return 0
}
return err
}

type mEntry struct {
fi *fs.FileStat
name string
Expand Down Expand Up @@ -334,58 +284,6 @@ func (j *juiceFS) readDirSorted(dirname string) ([]*mEntry, syscall.Errno) {
return mEntries, err
}

type WalkFunc func(path string, info *fs.FileStat, isSymlink bool, err syscall.Errno) syscall.Errno

func (d *juiceFS) ListAll(prefix, marker string) (<-chan object.Object, error) {
listed := make(chan object.Object, 10240)
var walkRoot string
if strings.HasSuffix(prefix, dirSuffix) {
walkRoot = prefix
} else {
// If the root is not ends with `/`, we'll list the directory root resides.
walkRoot = path.Dir(prefix) + dirSuffix
}
if walkRoot == "./" {
walkRoot = ""
}
go func() {
_ = d.walkRoot(dirSuffix+walkRoot, func(path string, info *fs.FileStat, isSymlink bool, err syscall.Errno) syscall.Errno {
if len(path) > 0 {
path = path[1:]
}
if err != 0 {
if err == syscall.ENOENT {
logger.Warnf("skip not exist file or directory: %s", path)
return 0
}
listed <- nil
logger.Errorf("list %s: %s", path, err)
return 0
}

if !strings.HasPrefix(path, prefix) {
if info.IsDir() && path != walkRoot {
return skipDir
}
return 0
}

key := path
if !strings.HasPrefix(key, prefix) || (marker != "" && key <= marker) {
if info.IsDir() && !strings.HasPrefix(prefix, key) && !strings.HasPrefix(marker, key) {
return skipDir
}
return 0
}
f := &jObj{key, info}
listed <- f
return 0
})
close(listed)
}()
return listed, nil
}

func (j *juiceFS) Chtimes(key string, mtime time.Time) error {
f, err := j.jfs.Open(ctx, j.path(key), 0)
if err != 0 {
Expand Down
13 changes: 11 additions & 2 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ func (m *baseMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
}
defer m.timeit("GetAttr", time.Now())
var err syscall.Errno
if inode == RootInode {
if inode == RootInode || inode == TrashInode {
// doGetAttr could overwrite the `attr` after timeout
var a Attr
e := utils.WithTimeout(func() error {
Expand All @@ -1261,6 +1261,11 @@ func (m *baseMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
attr.Mode = 0777
attr.Nlink = 2
attr.Length = 4 << 10
if inode == TrashInode {
attr.Mode = 0555
}
attr.Parent = RootInode
attr.Full = true
}
} else {
err = m.en.doGetAttr(ctx, inode, attr)
Expand Down Expand Up @@ -1725,7 +1730,11 @@ func (m *baseMeta) Readdir(ctx Context, inode Ino, plus uint8, entries *[]*Entry
Name: []byte(".."),
Attr: &Attr{Typ: TypeDirectory},
})
return m.en.doReaddir(ctx, inode, plus, entries, -1)
st := m.en.doReaddir(ctx, inode, plus, entries, -1)
if st == syscall.ENOENT && inode == TrashInode {
st = 0
}
return st
}

func (m *baseMeta) SetXattr(ctx Context, inode Ino, name string, value []byte, flags uint32) syscall.Errno {
Expand Down
120 changes: 4 additions & 116 deletions pkg/object/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,70 +197,6 @@ func (d *filestore) Delete(key string) error {
return err
}

// walk recursively descends path, calling w.
func walk(path string, info os.FileInfo, isSymlink bool, walkFn WalkFunc) error {
err := walkFn(path, info, isSymlink, nil)
if err != nil {
if info.IsDir() && err == filepath.SkipDir {
return nil
}
return err
}

if !info.IsDir() {
return nil
}

entries, err := readDirSorted(path)
if err != nil {
return walkFn(path, info, isSymlink, err)
}

for _, e := range entries {
p := filepath.Join(path, e.Name())
if e.IsDir() {
p = filepath.ToSlash(p + "/")
}
in, err := e.Info()
if err == nil {
err = walk(p, in, e.isSymlink, walkFn)
}
if err != nil && err != filepath.SkipDir && !os.IsNotExist(err) {
return err
}
}
return nil
}

// Walk walks the file tree rooted at root, calling walkFn for each file or
// directory in the tree, including root. All errors that arise visiting files
// and directories are filtered by walkFn. The files are walked in lexical
// order, which makes the output deterministic but means that for very
// large directories Walk can be inefficient.
// Walk always follow symbolic links.
func Walk(root string, walkFn WalkFunc) error {
var err error
var lstat, info os.FileInfo
lstat, err = os.Lstat(root)
if err != nil {
err = walkFn(root, nil, false, err)
} else {
isSymlink := lstat.Mode()&os.ModeSymlink != 0
info, err = os.Stat(root)
if err != nil {
// root is a broken link
err = walkFn(root, lstat, isSymlink, nil)
} else {
err = walk(root, info, isSymlink, walkFn)
}
}

if err == filepath.SkipDir {
return nil
}
return err
}

type mEntry struct {
os.DirEntry
name string
Expand Down Expand Up @@ -327,7 +263,10 @@ func (d *filestore) List(prefix, marker, delimiter string, limit int64) ([]Objec
var dir string = d.root + prefix
var objs []Object
if !strings.HasSuffix(dir, dirSuffix) {
dir = path.Dir(dir) + dirSuffix
dir = path.Dir(dir)
if !strings.HasSuffix(dir, dirSuffix) {
dir += dirSuffix
}
} else if marker == "" {
obj, err := d.Head(prefix)
if err != nil {
Expand Down Expand Up @@ -371,57 +310,6 @@ func (d *filestore) List(prefix, marker, delimiter string, limit int64) ([]Objec
return objs, nil
}

type WalkFunc func(path string, info fs.FileInfo, isSymlink bool, err error) error

func (d *filestore) ListAll(prefix, marker string) (<-chan Object, error) {
listed := make(chan Object, 10240)
go func() {
var walkRoot string
if strings.HasSuffix(d.root, dirSuffix) {
walkRoot = d.root
} else {
// If the root is not ends with `/`, we'll list the directory root resides.
walkRoot = path.Dir(d.root)
}

_ = Walk(walkRoot, func(path string, info os.FileInfo, isSymlink bool, err error) error {
if runtime.GOOS == "windows" {
path = strings.Replace(path, "\\", "/", -1)
}

if err != nil {
if os.IsNotExist(err) {
logger.Warnf("skip not exist file or directory: %s", path)
return nil
}
listed <- nil
logger.Errorf("list %s: %s", path, err)
return nil
}

if !strings.HasPrefix(path, d.root) {
if info.IsDir() && path != walkRoot {
return filepath.SkipDir
}
return nil
}

key := path[len(d.root):]
if !strings.HasPrefix(key, prefix) || (marker != "" && key <= marker) {
if info.IsDir() && !strings.HasPrefix(prefix, key) && !strings.HasPrefix(marker, key) {
return filepath.SkipDir
}
return nil
}
f := d.toFile(key, info, isSymlink)
listed <- f
return nil
})
close(listed)
}()
return listed, nil
}

func (d *filestore) Chtimes(path string, mtime time.Time) error {
p := d.path(path)
return os.Chtimes(p, mtime, mtime)
Expand Down
Loading