diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index da508a40777b..9a37669efdd6 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -30,7 +30,7 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.29 + version: v1.49.0 build: runs-on: ubuntu-latest steps: @@ -79,4 +79,4 @@ jobs: run: | wget https://github.com/apple/foundationdb/releases/download/6.3.23/foundationdb-clients_6.3.23-1_amd64.deb sudo dpkg -i foundationdb-clients_6.3.23-1_amd64.deb - GOPATH=$HOME/go make juicefs.fdb \ No newline at end of file + GOPATH=$HOME/go make juicefs.fdb diff --git a/cmd/objbench.go b/cmd/objbench.go index 904155147d84..ec6bcaa0b255 100644 --- a/cmd/objbench.go +++ b/cmd/objbench.go @@ -562,7 +562,7 @@ func (bm *benchMarkObj) chtimes(key string, startKey int) error { } func listAll(s object.ObjectStorage, prefix, marker string, limit int64) ([]object.Object, error) { - r, err := s.List(prefix, marker, limit) + r, err := s.List(prefix, marker, "", limit) if err == nil { return r, nil } diff --git a/pkg/object/azure.go b/pkg/object/azure.go index 61aefe9698f0..988fff4b2dca 100644 --- a/pkg/object/azure.go +++ b/pkg/object/azure.go @@ -93,7 +93,11 @@ func (b *wasb) Delete(key string) error { return err } -func (b *wasb) List(prefix, marker string, limit int64) ([]Object, error) { +func (b *wasb) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "" { + return nil, notSupportedDelimiter + } + // todo if marker != "" { if b.marker == "" { // last page diff --git a/pkg/object/b2.go b/pkg/object/b2.go index 63bf95beb722..22efc4227a02 100644 --- a/pkg/object/b2.go +++ b/pkg/object/b2.go @@ -114,7 +114,7 @@ func (c *b2client) Delete(key string) error { return err } -func (c *b2client) List(prefix, marker string, limit int64) ([]Object, error) { +func (c *b2client) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if limit > 1000 { limit = 1000 } @@ -122,7 +122,7 @@ func (c *b2client) List(prefix, marker string, limit int64) ([]Object, error) { marker = c.nextMarker c.nextMarker = "" } - resp, err := c.bucket.ListFileNamesWithPrefix(marker, int(limit), prefix, "") + resp, err := c.bucket.ListFileNamesWithPrefix(marker, int(limit), prefix, delimiter) if err != nil { return nil, err } diff --git a/pkg/object/bos.go b/pkg/object/bos.go index 299c52c36e14..07e48cc02fbe 100644 --- a/pkg/object/bos.go +++ b/pkg/object/bos.go @@ -25,6 +25,7 @@ import ( "net/http" "net/url" "os" + "sort" "strings" "time" @@ -112,12 +113,12 @@ func (q *bosclient) Delete(key string) error { return err } -func (q *bosclient) List(prefix, marker string, limit int64) ([]Object, error) { +func (q *bosclient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if limit > 1000 { limit = 1000 } limit_ := int(limit) - out, err := q.c.SimpleListObjects(q.bucket, prefix, limit_, marker, "") + out, err := q.c.SimpleListObjects(q.bucket, prefix, limit_, marker, delimiter) if err != nil { return nil, err } @@ -128,6 +129,12 @@ func (q *bosclient) List(prefix, marker string, limit int64) ([]Object, error) { mod, _ := time.Parse("2006-01-02T15:04:05Z", k.LastModified) objs[i] = &obj{k.Key, int64(k.Size), mod, strings.HasSuffix(k.Key, "/")} } + if delimiter != "" { + for _, p := range out.CommonPrefixes { + objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/cos.go b/pkg/object/cos.go index 2557286ba1cf..e02a3061019e 100644 --- a/pkg/object/cos.go +++ b/pkg/object/cos.go @@ -26,6 +26,7 @@ import ( "net/http" "net/url" "os" + "sort" "strconv" "strings" "time" @@ -119,11 +120,12 @@ func (c *COS) Delete(key string) error { return err } -func (c *COS) List(prefix, marker string, limit int64) ([]Object, error) { +func (c *COS) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { param := cos.BucketGetOptions{ - Prefix: prefix, - Marker: marker, - MaxKeys: int(limit), + Prefix: prefix, + Marker: marker, + MaxKeys: int(limit), + Delimiter: delimiter, } resp, _, err := c.c.Bucket.Get(ctx, ¶m) for err == nil && len(resp.Contents) == 0 && resp.IsTruncated { @@ -140,6 +142,12 @@ func (c *COS) List(prefix, marker string, limit int64) ([]Object, error) { t, _ := time.Parse(time.RFC3339, o.LastModified) objs[i] = &obj{o.Key, int64(o.Size), t, strings.HasSuffix(o.Key, "/")} } + if delimiter != "" { + for _, p := range resp.CommonPrefixes { + objs = append(objs, &obj{p, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/etcd.go b/pkg/object/etcd.go index db738c7c455d..2fa22a2edfc2 100644 --- a/pkg/object/etcd.go +++ b/pkg/object/etcd.go @@ -111,7 +111,10 @@ func genNextKey(key string) string { return string(next) } -func (c *etcdClient) List(prefix, marker string, limit int64) ([]Object, error) { +func (c *etcdClient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "" { + return nil, notSupportedDelimiter + } if marker == "" { marker = prefix } diff --git a/pkg/object/file.go b/pkg/object/file.go index c1272070247d..74672bbf1097 100644 --- a/pkg/object/file.go +++ b/pkg/object/file.go @@ -319,7 +319,7 @@ func readDirSorted(dirname string) ([]*mEntry, error) { return mEntries, err } -func (d *filestore) List(prefix, marker string, limit int64) ([]Object, error) { +func (d *filestore) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { return nil, notSupported } diff --git a/pkg/object/gs.go b/pkg/object/gs.go index e15997f9c54f..63ee7cabe54e 100644 --- a/pkg/object/gs.go +++ b/pkg/object/gs.go @@ -25,7 +25,9 @@ import ( "io" "net/url" "os" + "sort" "strings" + "time" "cloud.google.com/go/compute/metadata" "cloud.google.com/go/storage" @@ -48,7 +50,7 @@ func (g *gs) String() string { func (g *gs) Create() error { // check if the bucket is already exists - if objs, err := g.List("", "", 1); err == nil && len(objs) > 0 { + if objs, err := g.List("", "", "", 1); err == nil && len(objs) > 0 { return nil } @@ -135,12 +137,12 @@ func (g *gs) Delete(key string) error { return nil } -func (g *gs) List(prefix, marker string, limit int64) ([]Object, error) { +func (g *gs) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if marker != "" && g.pageToken == "" { // last page return nil, nil } - objectIterator := g.client.Bucket(g.bucket).Objects(ctx, &storage.Query{Prefix: prefix}) + objectIterator := g.client.Bucket(g.bucket).Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: delimiter}) pager := iterator.NewPager(objectIterator, int(limit), g.pageToken) var entries []*storage.ObjectAttrs nextPageToken, err := pager.NextPage(&entries) @@ -152,7 +154,14 @@ func (g *gs) List(prefix, marker string, limit int64) ([]Object, error) { objs := make([]Object, n) for i := 0; i < n; i++ { item := entries[i] - objs[i] = &obj{item.Name, item.Size, item.Updated, strings.HasSuffix(item.Name, "/")} + if delimiter != "" && item.Prefix != "" { + objs[i] = &obj{item.Prefix, 0, time.Unix(0, 0), true} + } else { + objs[i] = &obj{item.Name, item.Size, item.Updated, strings.HasSuffix(item.Name, "/")} + } + } + if delimiter != "" { + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } return objs, nil } diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index c4bfcf4ea87a..1447cd65f3f2 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -177,7 +177,7 @@ func (h *hdfsclient) Delete(key string) error { return err } -func (h *hdfsclient) List(prefix, marker string, limit int64) ([]Object, error) { +func (h *hdfsclient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { return nil, notSupported } diff --git a/pkg/object/ibmcos.go b/pkg/object/ibmcos.go index 25cf499a751f..5374fbe23999 100644 --- a/pkg/object/ibmcos.go +++ b/pkg/object/ibmcos.go @@ -27,7 +27,9 @@ import ( "net/http" "net/url" "os" + "sort" "strings" + "time" "github.com/IBM/ibm-cos-sdk-go/aws" "github.com/IBM/ibm-cos-sdk-go/aws/awserr" @@ -134,12 +136,13 @@ func (s *ibmcos) Delete(key string) error { return err } -func (s *ibmcos) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *ibmcos) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { param := s3.ListObjectsInput{ - Bucket: &s.bucket, - Prefix: &prefix, - Marker: &marker, - MaxKeys: &limit, + Bucket: &s.bucket, + Prefix: &prefix, + Marker: &marker, + MaxKeys: &limit, + Delimiter: &delimiter, } resp, err := s.s3.ListObjects(¶m) if err != nil { @@ -151,6 +154,12 @@ func (s *ibmcos) List(prefix, marker string, limit int64) ([]Object, error) { o := resp.Contents[i] objs[i] = &obj{*o.Key, *o.Size, *o.LastModified, strings.HasSuffix(*o.Key, "/")} } + if delimiter != "" { + for _, p := range resp.CommonPrefixes { + objs = append(objs, &obj{*p.Prefix, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/interface.go b/pkg/object/interface.go index ab15d58c6ecd..c64afe2aec66 100644 --- a/pkg/object/interface.go +++ b/pkg/object/interface.go @@ -77,7 +77,7 @@ type ObjectStorage interface { // Head returns some information about the object or an error if not found. Head(key string) (Object, error) // List returns a list of objects. - List(prefix, marker string, limit int64) ([]Object, error) + List(prefix, marker, delimiter string, limit int64) ([]Object, error) // ListAll returns all the objects as an channel. ListAll(prefix, marker string) (<-chan Object, error) diff --git a/pkg/object/ks3.go b/pkg/object/ks3.go index e17bea9ceef0..a42e6dc22243 100644 --- a/pkg/object/ks3.go +++ b/pkg/object/ks3.go @@ -27,7 +27,9 @@ import ( "net/http" "net/url" "os" + "sort" "strings" + "time" "github.com/aws/aws-sdk-go/aws/session" "github.com/juicedata/juicefs/pkg/utils" @@ -139,12 +141,13 @@ func (s *ks3) Delete(key string) error { return err } -func (s *ks3) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *ks3) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { param := s3.ListObjectsInput{ - Bucket: &s.bucket, - Prefix: &prefix, - Marker: &marker, - MaxKeys: &limit, + Bucket: &s.bucket, + Prefix: &prefix, + Marker: &marker, + MaxKeys: &limit, + Delimiter: &delimiter, } resp, err := s.s3.ListObjects(¶m) if err != nil { @@ -156,6 +159,12 @@ func (s *ks3) List(prefix, marker string, limit int64) ([]Object, error) { o := resp.Contents[i] objs[i] = &obj{*o.Key, *o.Size, *o.LastModified, strings.HasSuffix(*o.Key, "/")} } + if delimiter != "" { + for _, p := range resp.CommonPrefixes { + objs = append(objs, &obj{*p.Prefix, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/mem.go b/pkg/object/mem.go index eddd04c004de..17eee54a1887 100644 --- a/pkg/object/mem.go +++ b/pkg/object/mem.go @@ -129,7 +129,10 @@ func (m *memStore) Delete(key string) error { return nil } -func (m *memStore) List(prefix, marker string, limit int64) ([]Object, error) { +func (m *memStore) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "" { + return nil, notSupportedDelimiter + } m.Lock() defer m.Unlock() diff --git a/pkg/object/nos.go b/pkg/object/nos.go index 20f0c16c73df..0b6404589c43 100644 --- a/pkg/object/nos.go +++ b/pkg/object/nos.go @@ -27,6 +27,7 @@ import ( "net/http" "net/url" "os" + "sort" "strings" "time" @@ -130,12 +131,13 @@ func (s *nos) Delete(key string) error { return s.client.DeleteObject(¶m) } -func (s *nos) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *nos) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { param := model.ListObjectsRequest{ - Bucket: s.bucket, - Prefix: prefix, - Marker: marker, - MaxKeys: int(limit), + Bucket: s.bucket, + Prefix: prefix, + Marker: marker, + MaxKeys: int(limit), + Delimiter: delimiter, } resp, err := s.client.ListObjects(¶m) if err != nil { @@ -151,6 +153,12 @@ func (s *nos) List(prefix, marker string, limit int64) ([]Object, error) { } objs[i] = &obj{o.Key, o.Size, mtime, strings.HasSuffix(o.Key, "/")} } + if delimiter != "" { + for _, p := range resp.CommonPrefixes { + objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/object_storage.go b/pkg/object/object_storage.go index 137351a9c27a..d57ff19f36f6 100644 --- a/pkg/object/object_storage.go +++ b/pkg/object/object_storage.go @@ -18,6 +18,7 @@ package object import ( "context" + "errors" "fmt" "io" "os" @@ -103,6 +104,7 @@ type FileSystem interface { } var notSupported = utils.ENOTSUP +var notSupportedDelimiter = errors.New("not supported delimiter") type DefaultObjectStorage struct{} @@ -132,7 +134,7 @@ func (s DefaultObjectStorage) ListUploads(marker string) ([]*PendingPart, string return nil, "", nil } -func (s DefaultObjectStorage) List(prefix, marker string, limit int64) ([]Object, error) { +func (s DefaultObjectStorage) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { return nil, notSupported } diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index 99c900fe71b4..ec0c746c24b2 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -49,7 +49,7 @@ func get(s ObjectStorage, k string, off, limit int64) (string, error) { } func listAll(s ObjectStorage, prefix, marker string, limit int64) ([]Object, error) { - r, err := s.List(prefix, marker, limit) + r, err := s.List(prefix, marker, "", limit) if !errors.Is(err, notSupported) { return r, nil } @@ -172,6 +172,52 @@ func testStorage(t *testing.T, s ObjectStorage) { } } + defer s.Delete("a/a") + if err := s.Put("a/a", bytes.NewReader(br)); err != nil { + t.Fatalf("PUT failed: %s", err.Error()) + } + defer s.Delete("a/a1") + if err := s.Put("a/a1", bytes.NewReader(br)); err != nil { + t.Fatalf("PUT failed: %s", err.Error()) + } + defer s.Delete("b/b") + if err := s.Put("b/b", bytes.NewReader(br)); err != nil { + t.Fatalf("PUT failed: %s", err.Error()) + } + defer s.Delete("b/b1") + if err := s.Put("b/b1", bytes.NewReader(br)); err != nil { + t.Fatalf("PUT failed: %s", err.Error()) + } + defer s.Delete("c/") + //tikv will appear empty value is not supported + if err1 := s.Put("c/", bytes.NewReader(nil)); err1 != nil { + //minio will appear XMinioObjectExistsAsDirectory: Object name already exists as a directory. status code: 409 + if err2 := s.Put("c/", bytes.NewReader(br)); err2 != nil { + t.Fatalf("PUT failed err1: %s, err2: %s", err1.Error(), err2.Error()) + } + } + defer s.Delete("a1") + if err := s.Put("a1", bytes.NewReader(br)); err != nil { + t.Fatalf("PUT failed: %s", err.Error()) + } + if obs, err := s.List("", "", "/", 10); err != nil { + if !(errors.Is(err, notSupportedDelimiter) || errors.Is(err, notSupported)) { + t.Fatalf("list with delimiter: %s", err) + } else { + t.Logf("list api error: %s", err) + } + } else { + if len(obs) != 5 { + t.Fatalf("list with delimiter should return five results but got %d", len(obs)) + } + keys := []string{"a/", "a1", "b/", "c/", "test"} + for i, o := range obs { + if o.Key() != keys[i] { + t.Fatalf("should get key %s but got %s", keys[i], o.Key()) + } + } + } + // test redis cluster list all api keyTotal := 100 var sortedKeys []string @@ -297,6 +343,7 @@ func TestMem(t *testing.T) { } func TestDisk(t *testing.T) { + _ = os.RemoveAll("/tmp/abc/") s, _ := newDisk("/tmp/abc/", "", "", "") testStorage(t, s) } diff --git a/pkg/object/obs.go b/pkg/object/obs.go index d341a60dfc55..bb200b4c1826 100644 --- a/pkg/object/obs.go +++ b/pkg/object/obs.go @@ -29,7 +29,9 @@ import ( "net/http" "net/url" "os" + "sort" "strings" + "time" "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" "github.com/juicedata/juicefs/pkg/utils" @@ -157,13 +159,14 @@ func (s *obsClient) Delete(key string) error { return err } -func (s *obsClient) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *obsClient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { input := &obs.ListObjectsInput{ Bucket: s.bucket, Marker: marker, } input.Prefix = prefix input.MaxKeys = int(limit) + input.Delimiter = delimiter resp, err := s.c.ListObjects(input) if err != nil { return nil, err @@ -174,6 +177,12 @@ func (s *obsClient) List(prefix, marker string, limit int64) ([]Object, error) { o := resp.Contents[i] objs[i] = &obj{o.Key, o.Size, o.LastModified, strings.HasSuffix(o.Key, "/")} } + if delimiter != "" { + for _, p := range resp.CommonPrefixes { + objs = append(objs, &obj{p, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/oos.go b/pkg/object/oos.go index 7887823fca63..18137f512b98 100644 --- a/pkg/object/oos.go +++ b/pkg/object/oos.go @@ -39,18 +39,18 @@ func (s *oos) String() string { } func (s *oos) Create() error { - _, err := s.List("", "", 1) + _, err := s.List("", "", "", 1) if err != nil { return fmt.Errorf("please create bucket %s manually", s.s3client.bucket) } return err } -func (s *oos) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *oos) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if limit > 1000 { limit = 1000 } - objs, err := s.s3client.List(prefix, marker, limit) + objs, err := s.s3client.List(prefix, marker, delimiter, limit) if marker != "" && len(objs) > 0 && objs[0].Key() == marker { objs = objs[1:] } diff --git a/pkg/object/oss.go b/pkg/object/oss.go index fcce105e15af..48921cd4cfef 100644 --- a/pkg/object/oss.go +++ b/pkg/object/oss.go @@ -29,6 +29,7 @@ import ( "net/http" "net/url" "os" + "sort" "strconv" "strings" "time" @@ -128,12 +129,12 @@ func (o *ossClient) Delete(key string) error { return o.checkError(o.bucket.DeleteObject(key)) } -func (o *ossClient) List(prefix, marker string, limit int64) ([]Object, error) { +func (o *ossClient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if limit > 1000 { limit = 1000 } result, err := o.bucket.ListObjects(oss.Prefix(prefix), - oss.Marker(marker), oss.MaxKeys(int(limit))) + oss.Marker(marker), oss.Delimiter(delimiter), oss.MaxKeys(int(limit))) if o.checkError(err) != nil { return nil, err } @@ -143,6 +144,12 @@ func (o *ossClient) List(prefix, marker string, limit int64) ([]Object, error) { o := result.Objects[i] objs[i] = &obj{o.Key, o.Size, o.LastModified, strings.HasSuffix(o.Key, "/")} } + if delimiter != "" { + for _, o := range result.CommonPrefixes { + objs = append(objs, &obj{o, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/prefix.go b/pkg/object/prefix.go index c0acc170bc46..429acfa3e50b 100644 --- a/pkg/object/prefix.go +++ b/pkg/object/prefix.go @@ -81,11 +81,11 @@ func (p *withPrefix) Delete(key string) error { return p.os.Delete(p.prefix + key) } -func (p *withPrefix) List(prefix, marker string, limit int64) ([]Object, error) { +func (p *withPrefix) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if marker != "" { marker = p.prefix + marker } - objs, err := p.os.List(p.prefix+prefix, marker, limit) + objs, err := p.os.List(p.prefix+prefix, marker, delimiter, limit) ln := len(p.prefix) for _, o := range objs { switch p := o.(type) { diff --git a/pkg/object/qingstor.go b/pkg/object/qingstor.go index 56f2491ae15c..a651ff629a69 100644 --- a/pkg/object/qingstor.go +++ b/pkg/object/qingstor.go @@ -27,6 +27,7 @@ import ( "net/http" "net/url" "os" + "sort" "strings" "time" @@ -161,15 +162,16 @@ func (q *qingstor) Delete(key string) error { return err } -func (q *qingstor) List(prefix, marker string, limit int64) ([]Object, error) { +func (q *qingstor) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if limit > 1000 { limit = 1000 } limit_ := int(limit) input := &qs.ListObjectsInput{ - Prefix: &prefix, - Marker: &marker, - Limit: &limit_, + Prefix: &prefix, + Marker: &marker, + Limit: &limit_, + Delimiter: &delimiter, } out, err := q.bucket.ListObjects(input) if err != nil { @@ -186,6 +188,12 @@ func (q *qingstor) List(prefix, marker string, limit int64) ([]Object, error) { strings.HasSuffix(*k.Key, "/"), } } + if delimiter != "" { + for _, p := range out.CommonPrefixes { + objs = append(objs, &obj{*p, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/qiniu.go b/pkg/object/qiniu.go index b107697923d8..17f4b7ae25b3 100644 --- a/pkg/object/qiniu.go +++ b/pkg/object/qiniu.go @@ -25,6 +25,7 @@ import ( "net/http" "net/url" "os" + "sort" "strings" "time" @@ -134,7 +135,7 @@ func (q *qiniu) Delete(key string) error { return err } -func (q *qiniu) List(prefix, marker string, limit int64) ([]Object, error) { +func (q *qiniu) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if limit > 1000 { limit = 1000 } @@ -144,9 +145,9 @@ func (q *qiniu) List(prefix, marker string, limit int64) ([]Object, error) { // last page return nil, nil } - entries, _, markerOut, hasNext, err := q.bm.ListFiles(q.bucket, prefix, "", q.marker, int(limit)) + entries, prefixes, markerOut, hasNext, err := q.bm.ListFiles(q.bucket, prefix, delimiter, q.marker, int(limit)) for err == nil && len(entries) == 0 && hasNext { - entries, _, markerOut, hasNext, err = q.bm.ListFiles(q.bucket, prefix, "", markerOut, int(limit)) + entries, prefixes, markerOut, hasNext, err = q.bm.ListFiles(q.bucket, prefix, delimiter, markerOut, int(limit)) } q.marker = markerOut if len(entries) > 0 || err == io.EOF { @@ -163,6 +164,12 @@ func (q *qiniu) List(prefix, marker string, limit int64) ([]Object, error) { mtime := entry.PutTime / 10000000 objs[i] = &obj{entry.Key, entry.Fsize, time.Unix(mtime, 0), strings.HasSuffix(entry.Key, "/")} } + if delimiter != "" { + for _, p := range prefixes { + objs = append(objs, &obj{p, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/restful.go b/pkg/object/restful.go index 227b36b18fde..176a4ae2c06a 100644 --- a/pkg/object/restful.go +++ b/pkg/object/restful.go @@ -235,7 +235,7 @@ func (s *RestfulStorage) Delete(key string) error { return nil } -func (s *RestfulStorage) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *RestfulStorage) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { return nil, notSupported } diff --git a/pkg/object/s3.go b/pkg/object/s3.go index e618232a39f1..aa1832e09e91 100644 --- a/pkg/object/s3.go +++ b/pkg/object/s3.go @@ -28,7 +28,9 @@ import ( "net/url" "os" "regexp" + "sort" "strings" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -67,7 +69,7 @@ func isExists(err error) bool { } func (s *s3client) Create() error { - if _, err := s.List("", "", 1); err == nil { + if _, err := s.List("", "", "", 1); err == nil { return nil } _, err := s.s3.CreateBucket(&s3.CreateBucketInput{Bucket: &s.bucket}) @@ -168,12 +170,13 @@ func (s *s3client) Delete(key string) error { return err } -func (s *s3client) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *s3client) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { param := s3.ListObjectsInput{ - Bucket: &s.bucket, - Prefix: &prefix, - Marker: &marker, - MaxKeys: &limit, + Bucket: &s.bucket, + Prefix: &prefix, + Marker: &marker, + MaxKeys: &limit, + Delimiter: &delimiter, } resp, err := s.s3.ListObjects(¶m) if err != nil { @@ -193,6 +196,12 @@ func (s *s3client) List(prefix, marker string, limit int64) ([]Object, error) { strings.HasSuffix(*o.Key, "/"), } } + if delimiter != "" { + for _, p := range resp.CommonPrefixes { + objs = append(objs, &obj{*p.Prefix, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/scs.go b/pkg/object/scs.go index 351e4fc4b069..3145ad321114 100644 --- a/pkg/object/scs.go +++ b/pkg/object/scs.go @@ -89,7 +89,7 @@ func (s *scsClient) Delete(key string) error { return s.b.Delete(key) } -func (s *scsClient) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *scsClient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if marker != "" { if s.marker == "" { // last page @@ -97,7 +97,7 @@ func (s *scsClient) List(prefix, marker string, limit int64) ([]Object, error) { } marker = s.marker } - list, err := s.b.List("", prefix, marker, limit) + list, err := s.b.List(delimiter, prefix, marker, limit) if err != nil { s.marker = "" return nil, err @@ -119,6 +119,12 @@ func (s *scsClient) List(prefix, marker string, limit int64) ([]Object, error) { isDir: strings.HasSuffix(ob.Name, "/"), } } + if delimiter != "" { + for _, p := range list.CommonPrefixes { + objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true}) + } + sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) + } return objs, nil } diff --git a/pkg/object/sftp.go b/pkg/object/sftp.go index c9f4f02799aa..37cc609f801d 100644 --- a/pkg/object/sftp.go +++ b/pkg/object/sftp.go @@ -390,7 +390,7 @@ func (f *sftpStore) find(c *sftp.Client, path, marker string, out chan Object) { } } -func (f *sftpStore) List(prefix, marker string, limit int64) ([]Object, error) { +func (f *sftpStore) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { return nil, notSupported } diff --git a/pkg/object/sharding.go b/pkg/object/sharding.go index 8fea94e198cf..62107c3f190f 100644 --- a/pkg/object/sharding.go +++ b/pkg/object/sharding.go @@ -80,7 +80,7 @@ func ListAll(store ObjectStorage, prefix, marker string) (<-chan Object, error) startTime := time.Now() out := make(chan Object, maxResults) logger.Debugf("Listing objects from %s marker %q", store, marker) - objs, err := store.List(prefix, marker, maxResults) + objs, err := store.List(prefix, marker, "", maxResults) if err != nil { logger.Errorf("Can't list %s: %s", store, err.Error()) return nil, err @@ -112,12 +112,12 @@ func ListAll(store ObjectStorage, prefix, marker string) (<-chan Object, error) marker = lastkey startTime = time.Now() logger.Debugf("Continue listing objects from %s marker %q", store, marker) - objs, err = store.List(prefix, marker, maxResults) + objs, err = store.List(prefix, marker, "", maxResults) for err != nil { logger.Warnf("Fail to list: %s, retry again", err.Error()) // slow down time.Sleep(time.Millisecond * 100) - objs, err = store.List(prefix, marker, maxResults) + objs, err = store.List(prefix, marker, "", maxResults) } logger.Debugf("Found %d object from %s in %s", len(objs), store, time.Since(startTime)) } diff --git a/pkg/object/speedy.go b/pkg/object/speedy.go index fc0c16fa52d6..6a24e17cbc93 100644 --- a/pkg/object/speedy.go +++ b/pkg/object/speedy.go @@ -45,7 +45,7 @@ type ListBucketResult struct { Marker string MaxKeys string NextMarker string - CommonPrefixes string + CommonPrefixes []string } func (s *speedy) String() string { @@ -53,7 +53,10 @@ func (s *speedy) String() string { return fmt.Sprintf("speedy://%s/", uri.Host) } -func (s *speedy) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *speedy) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "" { + return nil, notSupportedDelimiter + } uri, _ := url.ParseRequestURI(s.endpoint) query := url.Values{} diff --git a/pkg/object/sql.go b/pkg/object/sql.go index 980c9da44200..66738b6112c3 100644 --- a/pkg/object/sql.go +++ b/pkg/object/sql.go @@ -127,10 +127,14 @@ func (s *sqlStore) Delete(key string) error { return err } -func (s *sqlStore) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *sqlStore) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if marker == "" { marker = prefix } + // todo + if delimiter != "" { + return nil, notSupportedDelimiter + } var bs []blob err := s.db.Where("`key` >= ?", marker).Limit(int(limit)).Cols("`key`", "size", "modified").OrderBy("`key`").Find(&bs) if err != nil { diff --git a/pkg/object/swift.go b/pkg/object/swift.go index e4cfe20c7d03..659f0adea022 100644 --- a/pkg/object/swift.go +++ b/pkg/object/swift.go @@ -26,6 +26,7 @@ import ( "net/url" "os" "strings" + "time" "github.com/juicedata/juicefs/pkg/utils" "github.com/ncw/swift" @@ -75,17 +76,30 @@ func (s *swiftOSS) Delete(key string) error { return err } -func (s *swiftOSS) List(prefix, marker string, limit int64) ([]Object, error) { +func (s *swiftOSS) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { if limit > 10000 { limit = 10000 } - objects, err := s.conn.Objects(s.container, &swift.ObjectsOpts{Prefix: prefix, Marker: marker, Limit: int(limit)}) + var delimiter_ rune + if delimiter != "" { + if len([]rune(delimiter)) == 1 { + delimiter_ = []rune(delimiter)[0] + } else { + return nil, fmt.Errorf("delimiter should be a rune but now is %s", delimiter) + } + } + objects, err := s.conn.Objects(s.container, &swift.ObjectsOpts{Prefix: prefix, Marker: marker, Delimiter: delimiter_, Limit: int(limit)}) if err != nil { return nil, err } var objs = make([]Object, len(objects)) for i, o := range objects { - objs[i] = &obj{o.Name, o.Bytes, o.LastModified, strings.HasSuffix(o.Name, "/")} + // https://docs.openstack.org/swift/latest/api/pseudo-hierarchical-folders-directories.html + if delimiter != "" && o.PseudoDirectory { + objs[i] = &obj{o.SubDir, 0, time.Unix(0, 0), true} + } else { + objs[i] = &obj{o.Name, o.Bytes, o.LastModified, strings.HasSuffix(o.Name, "/")} + } } return objs, nil } diff --git a/pkg/object/tikv.go b/pkg/object/tikv.go index 0b9a86f9ca2e..ea2f2dfc6b05 100644 --- a/pkg/object/tikv.go +++ b/pkg/object/tikv.go @@ -89,7 +89,10 @@ func (t *tikv) Delete(key string) error { return t.c.Delete(context.TODO(), []byte(key)) } -func (t *tikv) List(prefix, marker string, limit int64) ([]Object, error) { +func (t *tikv) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "" { + return nil, notSupportedDelimiter + } if marker == "" { marker = prefix } @@ -101,7 +104,7 @@ func (t *tikv) List(prefix, marker string, limit int64) ([]Object, error) { if err != nil { return nil, err } - var objs []Object = make([]Object, len(keys)) + var objs = make([]Object, len(keys)) mtime := time.Now() for i, k := range keys { // FIXME: mtime diff --git a/pkg/object/ufile.go b/pkg/object/ufile.go index 6d446b2ef07c..3d7712c127ef 100644 --- a/pkg/object/ufile.go +++ b/pkg/object/ufile.go @@ -200,7 +200,10 @@ type uFileListObjectsOutput struct { DataSet []*DataItem `json:"DataSet,omitempty"` } -func (u *ufile) List(prefix, marker string, limit int64) ([]Object, error) { +func (u *ufile) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "" { + return nil, notSupportedDelimiter + } query := url.Values{} query.Add("list", "") query.Add("prefix", prefix) diff --git a/pkg/object/upyun.go b/pkg/object/upyun.go index ac912ffc0422..2b7b5bfc804b 100644 --- a/pkg/object/upyun.go +++ b/pkg/object/upyun.go @@ -98,7 +98,10 @@ func (u *up) Copy(dst, src string) error { }) } -func (u *up) List(prefix, marker string, limit int64) ([]Object, error) { +func (u *up) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { + if delimiter != "" { + return nil, notSupportedDelimiter + } if u.listing == nil { listing := make(chan *upyun.FileInfo, limit) go func() { diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 6e09a9cc112f..c71d6e6111a0 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -115,7 +115,7 @@ func ListAll(store object.ObjectStorage, start, end string) (<-chan object.Objec marker := start logger.Debugf("Listing objects from %s marker %q", store, marker) - objs, err := store.List("", marker, maxResults) + objs, err := store.List("", marker, "", maxResults) if err != nil { logger.Errorf("Can't list %s: %s", store, err.Error()) return nil, err @@ -150,13 +150,13 @@ func ListAll(store object.ObjectStorage, start, end string) (<-chan object.Objec marker = lastkey startTime = time.Now() logger.Debugf("Continue listing objects from %s marker %q", store, marker) - objs, err = store.List("", marker, maxResults) + objs, err = store.List("", marker, "", maxResults) count := 0 for err != nil && count < 3 { logger.Warnf("Fail to list: %s, retry again", err.Error()) // slow down time.Sleep(time.Millisecond * 100) - objs, err = store.List("", marker, maxResults) + objs, err = store.List("", marker, "", maxResults) count++ } logger.Debugf("Found %d object from %s in %s", len(objs), store, time.Since(startTime))