diff --git a/cmd/objbench.go b/cmd/objbench.go index 53e6146dbd18..42a0ff088a6b 100644 --- a/cmd/objbench.go +++ b/cmd/objbench.go @@ -588,11 +588,7 @@ func (bm *benchMarkObj) chtimes(key string, startKey int) error { } func listAll(s object.ObjectStorage, prefix, marker string, limit int64) ([]object.Object, error) { - r, err := s.List(prefix, marker, "", limit) - if !errors.Is(err, utils.ENOTSUP) { - return r, err - } - ch, err := s.ListAll(prefix, marker) + ch, err := object.ListAll(s, prefix, marker) if err == nil { objs := make([]object.Object, 0) for obj := range ch { diff --git a/cmd/object.go b/cmd/object.go index eecee7cd34ce..1ec35dc91e7a 100644 --- a/cmd/object.go +++ b/cmd/object.go @@ -40,7 +40,6 @@ import ( "github.com/juicedata/juicefs/pkg/vfs" ) -var skipDir syscall.Errno = 100000 var dirSuffix = "/" func toError(eno syscall.Errno) error { @@ -207,7 +206,10 @@ func (j *juiceFS) List(prefix, marker, delimiter string, limit int64) ([]object. dir := j.path(prefix) var objs []object.Object if !strings.HasSuffix(dir, dirSuffix) { - dir = path.Dir(dir) + dirSuffix + dir = path.Dir(dir) + if !strings.HasSuffix(dir, dirSuffix) { + dir += dirSuffix + } } else if marker == "" { obj, err := j.Head(prefix) if err != nil { @@ -239,58 +241,6 @@ func (j *juiceFS) List(prefix, marker, delimiter string, limit int64) ([]object. return objs, nil } -// walk recursively descends path, calling w. -func (j *juiceFS) walk(path string, info *fs.FileStat, isSymlink bool, walkFn WalkFunc) syscall.Errno { - err := walkFn(path, info, isSymlink, 0) - if err != 0 { - if info.IsDir() && err == skipDir { - return 0 - } - return err - } - - if !info.IsDir() { - return 0 - } - - entries, err := j.readDirSorted(path) - if err != 0 { - return walkFn(path, info, isSymlink, err) - } - - for _, e := range entries { - p := path + e.name - err = j.walk(p, e.fi, e.isSymlink, walkFn) - if err != 0 && err != skipDir && err != syscall.ENOENT { - return err - } - } - return 0 -} - -func (j *juiceFS) walkRoot(root string, walkFn WalkFunc) syscall.Errno { - var err syscall.Errno - var lstat, info *fs.FileStat - lstat, err = j.jfs.Lstat(ctx, root) - if err != 0 { - err = walkFn(root, nil, false, err) - } else { - isSymlink := lstat.IsSymlink() - info, err = j.jfs.Stat(ctx, root) - if err != 0 { - // root is a broken link - err = walkFn(root, lstat, isSymlink, 0) - } else { - err = j.walk(root, info, isSymlink, walkFn) - } - } - - if err == skipDir { - return 0 - } - return err -} - type mEntry struct { fi *fs.FileStat name string @@ -334,58 +284,6 @@ func (j *juiceFS) readDirSorted(dirname string) ([]*mEntry, syscall.Errno) { return mEntries, err } -type WalkFunc func(path string, info *fs.FileStat, isSymlink bool, err syscall.Errno) syscall.Errno - -func (d *juiceFS) ListAll(prefix, marker string) (<-chan object.Object, error) { - listed := make(chan object.Object, 10240) - var walkRoot string - if strings.HasSuffix(prefix, dirSuffix) { - walkRoot = prefix - } else { - // If the root is not ends with `/`, we'll list the directory root resides. - walkRoot = path.Dir(prefix) + dirSuffix - } - if walkRoot == "./" { - walkRoot = "" - } - go func() { - _ = d.walkRoot(dirSuffix+walkRoot, func(path string, info *fs.FileStat, isSymlink bool, err syscall.Errno) syscall.Errno { - if len(path) > 0 { - path = path[1:] - } - if err != 0 { - if err == syscall.ENOENT { - logger.Warnf("skip not exist file or directory: %s", path) - return 0 - } - listed <- nil - logger.Errorf("list %s: %s", path, err) - return 0 - } - - if !strings.HasPrefix(path, prefix) { - if info.IsDir() && path != walkRoot { - return skipDir - } - return 0 - } - - key := path - if !strings.HasPrefix(key, prefix) || (marker != "" && key <= marker) { - if info.IsDir() && !strings.HasPrefix(prefix, key) && !strings.HasPrefix(marker, key) { - return skipDir - } - return 0 - } - f := &jObj{key, info} - listed <- f - return 0 - }) - close(listed) - }() - return listed, nil -} - func (j *juiceFS) Chtimes(key string, mtime time.Time) error { f, err := j.jfs.Open(ctx, j.path(key), 0) if err != 0 { diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 17cceb5a6896..f2ac74d8e462 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -1246,7 +1246,7 @@ func (m *baseMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { } defer m.timeit("GetAttr", time.Now()) var err syscall.Errno - if inode == RootInode { + if inode == RootInode || inode == TrashInode { // doGetAttr could overwrite the `attr` after timeout var a Attr e := utils.WithTimeout(func() error { @@ -1261,6 +1261,11 @@ func (m *baseMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { attr.Mode = 0777 attr.Nlink = 2 attr.Length = 4 << 10 + if inode == TrashInode { + attr.Mode = 0555 + } + attr.Parent = RootInode + attr.Full = true } } else { err = m.en.doGetAttr(ctx, inode, attr) @@ -1725,7 +1730,11 @@ func (m *baseMeta) Readdir(ctx Context, inode Ino, plus uint8, entries *[]*Entry Name: []byte(".."), Attr: &Attr{Typ: TypeDirectory}, }) - return m.en.doReaddir(ctx, inode, plus, entries, -1) + st := m.en.doReaddir(ctx, inode, plus, entries, -1) + if st == syscall.ENOENT && inode == TrashInode { + st = 0 + } + return st } func (m *baseMeta) SetXattr(ctx Context, inode Ino, name string, value []byte, flags uint32) syscall.Errno { diff --git a/pkg/object/file.go b/pkg/object/file.go index 19db7b76fe59..a6694de1dd76 100644 --- a/pkg/object/file.go +++ b/pkg/object/file.go @@ -197,70 +197,6 @@ func (d *filestore) Delete(key string) error { return err } -// walk recursively descends path, calling w. -func walk(path string, info os.FileInfo, isSymlink bool, walkFn WalkFunc) error { - err := walkFn(path, info, isSymlink, nil) - if err != nil { - if info.IsDir() && err == filepath.SkipDir { - return nil - } - return err - } - - if !info.IsDir() { - return nil - } - - entries, err := readDirSorted(path) - if err != nil { - return walkFn(path, info, isSymlink, err) - } - - for _, e := range entries { - p := filepath.Join(path, e.Name()) - if e.IsDir() { - p = filepath.ToSlash(p + "/") - } - in, err := e.Info() - if err == nil { - err = walk(p, in, e.isSymlink, walkFn) - } - if err != nil && err != filepath.SkipDir && !os.IsNotExist(err) { - return err - } - } - return nil -} - -// Walk walks the file tree rooted at root, calling walkFn for each file or -// directory in the tree, including root. All errors that arise visiting files -// and directories are filtered by walkFn. The files are walked in lexical -// order, which makes the output deterministic but means that for very -// large directories Walk can be inefficient. -// Walk always follow symbolic links. -func Walk(root string, walkFn WalkFunc) error { - var err error - var lstat, info os.FileInfo - lstat, err = os.Lstat(root) - if err != nil { - err = walkFn(root, nil, false, err) - } else { - isSymlink := lstat.Mode()&os.ModeSymlink != 0 - info, err = os.Stat(root) - if err != nil { - // root is a broken link - err = walkFn(root, lstat, isSymlink, nil) - } else { - err = walk(root, info, isSymlink, walkFn) - } - } - - if err == filepath.SkipDir { - return nil - } - return err -} - type mEntry struct { os.DirEntry name string @@ -327,7 +263,10 @@ func (d *filestore) List(prefix, marker, delimiter string, limit int64) ([]Objec var dir string = d.root + prefix var objs []Object if !strings.HasSuffix(dir, dirSuffix) { - dir = path.Dir(dir) + dirSuffix + dir = path.Dir(dir) + if !strings.HasSuffix(dir, dirSuffix) { + dir += dirSuffix + } } else if marker == "" { obj, err := d.Head(prefix) if err != nil { @@ -371,57 +310,6 @@ func (d *filestore) List(prefix, marker, delimiter string, limit int64) ([]Objec return objs, nil } -type WalkFunc func(path string, info fs.FileInfo, isSymlink bool, err error) error - -func (d *filestore) ListAll(prefix, marker string) (<-chan Object, error) { - listed := make(chan Object, 10240) - go func() { - var walkRoot string - if strings.HasSuffix(d.root, dirSuffix) { - walkRoot = d.root - } else { - // If the root is not ends with `/`, we'll list the directory root resides. - walkRoot = path.Dir(d.root) - } - - _ = Walk(walkRoot, func(path string, info os.FileInfo, isSymlink bool, err error) error { - if runtime.GOOS == "windows" { - path = strings.Replace(path, "\\", "/", -1) - } - - if err != nil { - if os.IsNotExist(err) { - logger.Warnf("skip not exist file or directory: %s", path) - return nil - } - listed <- nil - logger.Errorf("list %s: %s", path, err) - return nil - } - - if !strings.HasPrefix(path, d.root) { - if info.IsDir() && path != walkRoot { - return filepath.SkipDir - } - return nil - } - - key := path[len(d.root):] - if !strings.HasPrefix(key, prefix) || (marker != "" && key <= marker) { - if info.IsDir() && !strings.HasPrefix(prefix, key) && !strings.HasPrefix(marker, key) { - return filepath.SkipDir - } - return nil - } - f := d.toFile(key, info, isSymlink) - listed <- f - return nil - }) - close(listed) - }() - return listed, nil -} - func (d *filestore) Chtimes(path string, mtime time.Time) error { p := d.path(path) return os.Chtimes(p, mtime, mtime) diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index c9bc73a95a60..cd84af729f28 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -181,8 +181,11 @@ func (h *hdfsclient) List(prefix, marker, delimiter string, limit int64) ([]Obje } dir := h.path(prefix) var objs []Object - if !strings.HasSuffix(dir, "/") { - dir = filepath.Dir(dir) + dirSuffix + if !strings.HasSuffix(dir, dirSuffix) { + dir = filepath.Dir(dir) + if !strings.HasSuffix(dir, dirSuffix) { + dir += dirSuffix + } } else if marker == "" { obj, err := h.Head(prefix) if err != nil { @@ -237,97 +240,6 @@ func (h *hdfsclient) List(prefix, marker, delimiter string, limit int64) ([]Obje return objs, nil } -func (h *hdfsclient) walk(path string, walkFn filepath.WalkFunc) error { - file, err := h.c.Open(path) - var info os.FileInfo - if file != nil { - info = file.Stat() - } - - err = walkFn(path, info, err) - if err != nil { - if info != nil && info.IsDir() && err == filepath.SkipDir { - return nil - } - - return err - } - - if info == nil || !info.IsDir() { - return nil - } - - infos, err := file.Readdir(0) - if err != nil { - return walkFn(path, info, err) - } - - // make sure they are ordered in full path - names := make([]string, len(infos)) - for i, info := range infos { - if info.IsDir() { - names[i] = info.Name() + "/" - } else { - names[i] = info.Name() - } - } - sort.Strings(names) - - for _, name := range names { - name = strings.TrimSuffix(name, "/") - err = h.walk(filepath.ToSlash(filepath.Join(path, name)), walkFn) - if err != nil { - return err - } - } - - return nil -} - -func (h *hdfsclient) ListAll(prefix, marker string) (<-chan Object, error) { - listed := make(chan Object, 10240) - root := h.path(prefix) - _, err := h.c.Stat(root) - if err != nil && err.(*os.PathError).Err == os.ErrNotExist || !strings.HasSuffix(prefix, "/") { - root = filepath.Dir(root) - } - _, err = h.c.Stat(root) - if err != nil && err.(*os.PathError).Err == os.ErrNotExist { - close(listed) - return listed, nil // return empty list - } - go func() { - _ = h.walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil { - if err == io.EOF { - err = nil // ignore - } else { - logger.Errorf("list %s: %s", path, err) - listed <- nil - } - return err - } - key := path[len(h.basePath):] - if !strings.HasPrefix(key, prefix) || (marker != "" && key <= marker) { - if info.IsDir() && !strings.HasPrefix(prefix, key) && !strings.HasPrefix(marker, key) { - return filepath.SkipDir - } - return nil - } - if info.IsDir() { - if path != root || !strings.HasSuffix(root, "/") { - key += "/" - } - } - f := h.toFile(key, info) - listed <- f - return nil - }) - close(listed) - }() - return listed, nil -} - func (h *hdfsclient) Chtimes(key string, mtime time.Time) error { return h.c.Chtimes(h.path(key), mtime, mtime) } diff --git a/pkg/object/mem.go b/pkg/object/mem.go index 463910301eb0..df357bf6f344 100644 --- a/pkg/object/mem.go +++ b/pkg/object/mem.go @@ -189,10 +189,6 @@ func (m *memStore) List(prefix, marker, delimiter string, limit int64) ([]Object return objs, nil } -func (m *memStore) ListAll(prefix, marker string) (<-chan Object, error) { - return nil, notSupported -} - func newMem(endpoint, accesskey, secretkey, token string) (ObjectStorage, error) { store := &memStore{name: endpoint} store.objects = make(map[string]*mobj) diff --git a/pkg/object/object_storage.go b/pkg/object/object_storage.go index a874bdac098f..387672c85da1 100644 --- a/pkg/object/object_storage.go +++ b/pkg/object/object_storage.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "strings" "sync" "time" @@ -179,3 +180,94 @@ var bufPool = sync.Pool{ return &buf }, } + +func ListAllWithDelimiter(store ObjectStorage, prefix, start, end string) (<-chan Object, error) { + entries, err := store.List(prefix, "", "/", 1e9) + if err != nil { + logger.Errorf("list %s: %s", prefix, err) + return nil, err + } + + listed := make(chan Object, 10240) + var walk func(string, []Object) error + walk = func(prefix string, entries []Object) error { + var err error + var concurrent = 10 + ms := make([]sync.Mutex, concurrent) + conds := make([]*utils.Cond, concurrent) + ready := make([]bool, concurrent) + children := make([][]Object, len(entries)) + for c := 0; c < concurrent; c++ { + conds[c] = utils.NewCond(&ms[c]) + go func(c int) { + for i := c; i < len(entries); i += concurrent { + key := entries[i].Key() + if !entries[i].IsDir() || key == prefix { + continue + } + if end != "" && key >= end { + break + } + if key < start && !strings.HasPrefix(start, key) { + continue + } + + children[i], err = store.List(key, "\x00", "/", 1e9) // exclude itself + ms[c].Lock() + ready[c] = true + conds[c].Signal() + ms[c].Unlock() + + ms[c].Lock() + for ready[c] { + conds[c].WaitWithTimeout(time.Second) + if err != nil { + return + } + } + ms[c].Unlock() + } + }(c) + } + + for i, e := range entries { + if end != "" && e.Key() >= end { + return nil + } + if e.Key() >= start { + listed <- e + } else if !strings.HasPrefix(start, e.Key()) { + continue + } + if e.IsDir() && e.Key() != prefix { + c := i % concurrent + ms[c].Lock() + for !ready[c] { + conds[c].Wait() + } + ready[c] = false + conds[c].Signal() + ms[c].Unlock() + if err != nil { + return err + } + + err = walk(e.Key(), children[i]) + if err != nil { + return err + } + children[i] = nil + } + } + return nil + } + + go func() { + defer close(listed) + err := walk(prefix, entries) + if err != nil { + listed <- nil + } + }() + return listed, nil +} diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index 4445ebd918e9..39047c376db1 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -59,11 +59,7 @@ func get(s ObjectStorage, k string, off, limit int64) (string, error) { } func listAll(s ObjectStorage, prefix, marker string, limit int64) ([]Object, error) { - r, err := s.List(prefix, marker, "", limit) - if !errors.Is(err, notSupported) { - return r, err - } - ch, err := s.ListAll(prefix, marker) + ch, err := ListAll(s, prefix, marker) if err == nil { objs := make([]Object, 0) for obj := range ch { @@ -197,7 +193,7 @@ func testStorage(t *testing.T, s ObjectStorage) { t.Fatalf("First object size should be 0, but got %d", objs[0].Size()) } if objs[1].Key() != "test" { - t.Fatalf("First key should be test, but got %s", objs[1].Key()) + t.Fatalf("Second key should be test, but got %s", objs[1].Key()) } if !strings.Contains(s.String(), "encrypted") && objs[1].Size() != 5 { t.Fatalf("Size of first key shold be 5, but got %v", objs[1].Size()) diff --git a/pkg/object/scs.go b/pkg/object/scs.go index 20d91fa1bdbe..c259ab1462b0 100644 --- a/pkg/object/scs.go +++ b/pkg/object/scs.go @@ -136,10 +136,6 @@ func (s *scsClient) List(prefix, marker, delimiter string, limit int64) ([]Objec return objs, nil } -func (s *scsClient) ListAll(prefix, marker string) (<-chan Object, error) { - return nil, notSupported -} - func (s *scsClient) CreateMultipartUpload(key string) (*MultipartUpload, error) { mu, err := s.b.InitiateMultipartUpload(key, map[string]string{}) if err != nil { diff --git a/pkg/object/sftp.go b/pkg/object/sftp.go index 3c189b1f4994..6dc970ebec8d 100644 --- a/pkg/object/sftp.go +++ b/pkg/object/sftp.go @@ -344,62 +344,6 @@ func (f *sftpStore) fileInfo(c *sftp.Client, key string, fi os.FileInfo) Object return ff } -func (f *sftpStore) doFind(c *sftp.Client, path, marker string, out chan Object) { - infos, err := c.ReadDir(path) - if err != nil { - logger.Errorf("readdir %s: %s", path, err) - return - } - - obs := f.sortByName(c, path, infos) - for _, o := range obs { - key := o.Key() - if key > marker { - out <- o - } - if o.IsDir() && (key > marker || strings.HasPrefix(marker, key)) { - f.doFind(c, f.root+key, marker, out) - } - } -} - -func (f *sftpStore) find(c *sftp.Client, path, marker string, out chan Object) { - if strings.HasSuffix(path, dirSuffix) { - fi, err := c.Stat(path) - if err != nil { - logger.Errorf("Stat %s error: %q", path, err) - return - } - if marker == "" { - out <- f.fileInfo(nil, path[len(f.root):], fi) - } - f.doFind(c, path, marker, out) - } else { - // As files or dirs in the same directory of file `path` resides - // may have prefix `path`, we should list the directory. - dir := filepath.Dir(path) + dirSuffix - infos, err := c.ReadDir(dir) - if err != nil { - logger.Errorf("readdir %s: %s", dir, err) - return - } - - obs := f.sortByName(c, dir, infos) - for _, o := range obs { - key := o.Key() - p := f.root + o.Key() - if strings.HasPrefix(p, path) { - if key > marker { - out <- o - } - if o.IsDir() && (key > marker || strings.HasPrefix(marker, key)) { - f.doFind(c, p, marker, out) - } - } - } - } -} - func (f *sftpStore) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if delimiter != "/" { return nil, notSupported @@ -414,7 +358,10 @@ func (f *sftpStore) List(prefix, marker, delimiter string, limit int64) ([]Objec var objs []Object dir := f.path(prefix) if !strings.HasSuffix(dir, "/") { - dir = filepath.Dir(dir) + dirSuffix + dir = filepath.Dir(dir) + if !strings.HasSuffix(dir, dirSuffix) { + dir += dirSuffix + } } else if marker == "" { obj, err := f.Head(prefix) if err != nil { @@ -447,21 +394,6 @@ func (f *sftpStore) List(prefix, marker, delimiter string, limit int64) ([]Objec return objs, nil } -func (f *sftpStore) ListAll(prefix, marker string) (<-chan Object, error) { - c, err := f.getSftpConnection() - if err != nil { - return nil, err - } - listed := make(chan Object, 10240) - go func() { - defer f.putSftpConnection(&c, nil) - - f.find(c.sftpClient, f.path(prefix), marker, listed) - close(listed) - }() - return listed, nil -} - func sshInteractive(user, instruction string, questions []string, echos []bool) (answers []string, err error) { if len(questions) == 0 { fmt.Print(user, instruction) diff --git a/pkg/object/sharding.go b/pkg/object/sharding.go index b63d3b41dffe..61cebac54512 100644 --- a/pkg/object/sharding.go +++ b/pkg/object/sharding.go @@ -99,12 +99,16 @@ func ListAll(store ObjectStorage, prefix, marker string) (<-chan Object, error) out := make(chan Object, maxResults) logger.Debugf("Listing objects from %s marker %q", store, marker) objs, err := store.List(prefix, marker, "", maxResults) + if err == notSupported { + return ListAllWithDelimiter(store, prefix, marker, "") + } if err != nil { logger.Errorf("Can't list %s: %s", store, err.Error()) return nil, err } logger.Debugf("Found %d object from %s in %s", len(objs), store, time.Since(startTime)) go func() { + defer close(out) lastkey := "" first := true END: @@ -139,7 +143,6 @@ func ListAll(store ObjectStorage, prefix, marker string) (<-chan Object, error) } logger.Debugf("Found %d object from %s in %s", len(objs), store, time.Since(startTime)) } - close(out) }() return out, nil } diff --git a/pkg/object/sql.go b/pkg/object/sql.go index b4a667776ba6..a06a3a20524a 100644 --- a/pkg/object/sql.go +++ b/pkg/object/sql.go @@ -137,7 +137,7 @@ func (s *sqlStore) List(prefix, marker, delimiter string, limit int64) ([]Object return nil, notSupported } var bs []blob - err := s.db.Where("`key` >= ?", []byte(marker)).Limit(int(limit)).Cols("`key`", "size", "modified").OrderBy("`key`").Find(&bs) + err := s.db.Where("`key` > ?", []byte(marker)).Limit(int(limit)).Cols("`key`", "size", "modified").OrderBy("`key`").Find(&bs) if err != nil { return nil, err } diff --git a/pkg/object/webdav.go b/pkg/object/webdav.go index c65b683945b2..7316a3be2a16 100644 --- a/pkg/object/webdav.go +++ b/pkg/object/webdav.go @@ -22,7 +22,6 @@ package object import ( "fmt" "io" - "io/fs" "net/http" "net/url" "os" @@ -167,8 +166,6 @@ func (w *webdav) Copy(dst, src string) error { return w.c.Copy(src, dst, true) } -type WebDAVWalkFunc func(path string, info fs.FileInfo, err error) error - type webDAVFile struct { os.FileInfo name string @@ -178,16 +175,28 @@ func (w webDAVFile) Name() string { return w.name } -func (w *webdav) webdavWalk(path string, info fs.FileInfo, walkFn WebDAVWalkFunc) error { - if !info.IsDir() { - return walkFn(path, info, nil) +func (w *webdav) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "/" { + return nil, notSupported } - infos, err := w.c.ReadDir(path) - err = walkFn(path, info, err) - if err != nil { - return err + + root := "/" + prefix + var objs []Object + if !strings.HasSuffix(root, dirSuffix) { + // If the root is not ends with `/`, we'll list the directory root resides. + root = path.Dir(root) + if !strings.HasSuffix(root, dirSuffix) { + root += dirSuffix + } } + infos, err := w.c.ReadDir(root) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } sortedInfos := make([]os.FileInfo, len(infos)) for idx, o := range infos { if o.IsDir() { @@ -200,75 +209,22 @@ func (w *webdav) webdavWalk(path string, info fs.FileInfo, walkFn WebDAVWalkFunc return sortedInfos[i].Name() < sortedInfos[j].Name() }) for _, info := range sortedInfos { - err := w.webdavWalk(path+info.Name(), info, walkFn) - if err != nil && err != fs.SkipDir { - return err + key := root[1:] + info.Name() + if !strings.HasPrefix(key, prefix) || (marker != "" && key <= marker) { + continue } - } - return nil -} - -func (w *webdav) Walk(root string, fn WebDAVWalkFunc) error { - info, err := w.c.Stat(root) - if err != nil { - err = fn(root, nil, err) - } else { - err = w.webdavWalk(root, info, fn) - } - return err -} - -func (w *webdav) ListAll(prefix, marker string) (<-chan Object, error) { - if !strings.HasPrefix(prefix, dirSuffix) { - prefix = dirSuffix + prefix - } - if marker != "" && !strings.HasPrefix(marker, dirSuffix) { - marker = dirSuffix + marker - } - var root string - if strings.HasSuffix(prefix, dirSuffix) { - root = prefix - } else { - // If the root is not ends with `/`, we'll list the directory root resides. - root = path.Dir(prefix) - if !strings.HasSuffix(root, dirSuffix) { - root += dirSuffix + objs = append(objs, &obj{ + key, + info.Size(), + info.ModTime(), + info.IsDir(), + "", + }) + if len(objs) == int(limit) { + break } } - - listed := make(chan Object, 10240) - go func() { - defer close(listed) - _ = w.Walk(root, func(path string, info fs.FileInfo, err error) error { - if err != nil { - if gowebdav.IsErrNotFound(err) { - logger.Warnf("skip not exist file or directory: %s", path) - return nil - } - listed <- nil - logger.Errorf("list %s: %s", path, err) - return err - } - if info.IsDir() { - if !strings.HasPrefix(path, prefix) && path != root || path < marker && !strings.HasPrefix(marker, path) { - return fs.SkipDir - } - return nil - } - if !strings.HasPrefix(path, prefix) || (marker != "" && path <= marker) { - return nil - } - listed <- &obj{ - path[1:], - info.Size(), - info.ModTime(), - false, - "", - } - return nil - }) - }() - return listed, nil + return objs, nil } func newWebDAV(endpoint, user, passwd, token string) (ObjectStorage, error) { diff --git a/pkg/sync/download_test.go b/pkg/sync/download_test.go index bc2808105fe2..dff719423c13 100644 --- a/pkg/sync/download_test.go +++ b/pkg/sync/download_test.go @@ -85,7 +85,7 @@ func TestDownload(t *testing.T) { } n, err = pr.Read(res) if err != io.EOF || n != 0 { - t.Fatalf("err should be io.EOF or n should equal 0") + t.Fatalf("err should be io.EOF or n should equal 0, but got %s %d", err, n) } }}, @@ -102,7 +102,7 @@ func TestDownload(t *testing.T) { } n, err = pr.Read(res) if err != io.EOF || n != 0 { - t.Fatalf("err should be io.EOF or n should equal 0") + t.Fatalf("err should be io.EOF or n should equal 0, but got %s %d", err, n) } }}, @@ -120,7 +120,7 @@ func TestDownload(t *testing.T) { } n, err = pr.Read(res) if err != io.EOF || n != 0 { - t.Fatalf("err should be io.EOF or n should equal 0") + t.Fatalf("err should be io.EOF or n should equal 0, but got %s %d", err, n) } }}, @@ -149,7 +149,7 @@ func TestDownload(t *testing.T) { res := make([]byte, 1) n, err := pr.Read(res) if err != io.EOF || n != 0 { - t.Fatalf("err should be io.EOF or n should equal 0") + t.Fatalf("err should be io.EOF or n should equal 0, but got %s %d", err, n) } }}, @@ -159,7 +159,7 @@ func TestDownload(t *testing.T) { pr.key = "notExist" n, err := pr.Read(res) if !os.IsNotExist(err) || n != 0 { - t.Fatalf("err should be ErrNotExist or n should equal 0") + t.Fatalf("err should be ErrNotExist or n should equal 0, but got %s %d", err, n) } }}, } diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index d060013d7489..dcb90b38618a 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -89,19 +89,9 @@ func ListAll(store object.ObjectStorage, prefix, start, end string) (<-chan obje } if ch, err := store.ListAll(prefix, start); err == nil { - if end == "" { - go func() { - for obj := range ch { - out <- obj - } - close(out) - }() - return out, nil - } - go func() { for obj := range ch { - if obj != nil && obj.Key() > end { + if obj != nil && end != "" && obj.Key() > end { break } out <- obj @@ -116,6 +106,9 @@ func ListAll(store object.ObjectStorage, prefix, start, end string) (<-chan obje marker := start logger.Debugf("Listing objects from %s marker %q", store, marker) objs, err := store.List(prefix, marker, "", maxResults) + if err == utils.ENOTSUP { + return object.ListAllWithDelimiter(store, prefix, start, end) + } if err != nil { logger.Errorf("Can't list %s: %s", store, err.Error()) return nil, err diff --git a/pkg/sync/sync_test.go b/pkg/sync/sync_test.go index b90cf63a560f..fd7f7e83d5d8 100644 --- a/pkg/sync/sync_test.go +++ b/pkg/sync/sync_test.go @@ -148,8 +148,8 @@ func TestSync(t *testing.T) { t.Fatalf("should copy 1 keys, but got %d", c) } // Now a: {"a1", "a2", "abc", "ba", "c1", "c2"}, b: {"a1", "a2", "ba"} - aRes, _ := a.ListAll("", "") - bRes, _ := b.ListAll("", "") + aRes, _ := ListAll(a, "", "", "") + bRes, _ := ListAll(b, "", "", "") var aObjs, bObjs []object.Object for obj := range aRes { @@ -164,7 +164,7 @@ func TestSync(t *testing.T) { } if !deepEqualWithOutMtime(aObjs[4], bObjs[len(bObjs)-1]) { - t.Fatalf("expect %+v but got %+v", aObjs[3], bObjs[len(bObjs)-1]) + t.Fatalf("expect %+v but got %+v", aObjs[4], bObjs[len(bObjs)-1]) } // Test --force-update option config.ForceUpdate = true @@ -249,7 +249,7 @@ func TestSyncIncludeAndExclude(t *testing.T) { t.Fatalf("sync: %s", err) } - bRes, _ := b.ListAll("", "") + bRes, _ := ListAll(b, "", "", "") var bKeys []string for obj := range bRes { bKeys = append(bKeys, obj.Key()) @@ -481,7 +481,7 @@ func TestLimits(t *testing.T) { t.Fatalf("sync: %s", err) } - all, err := tcase.dst.ListAll("", "") + all, err := ListAll(tcase.dst, "", "", "") if err != nil { t.Fatalf("list all b: %s", err) }