diff --git a/pkg/object/ceph.go b/pkg/object/ceph.go index 91701d00ed3f0..c540092256ce9 100644 --- a/pkg/object/ceph.go +++ b/pkg/object/ceph.go @@ -230,24 +230,76 @@ func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, e } // the keys are not ordered, sort them first sort.Strings(keys) + c.release(ctx) var objs = make(chan Object, 1000) - // TODO: parallel + var concurrent = 20 + ms := make([]sync.Mutex, concurrent) + conds := make([]*sync.Cond, concurrent) + ready := make([]bool, concurrent) + results := make([]Object, concurrent) + errs := make([]error, concurrent) + for c := 0; c < concurrent; c++ { + conds[c] = sync.NewCond(&ms[c]) + if c < len(keys) { + go func(c int) { + ctx, err := c.newContext() + if err != nil { + logger.Errorf("new context: %s", err) + errs[c] = err + return + } + defer ctx.Destroy() + for i := c; i < len(keys); i += concurrent { + key := keys[i] + st, err := ctx.Stat(key) + if err != nil { + if errors.Is(err, rados.ErrNotFound) { + logger.Debugf("Skip non-existent key: %s", key) + results[c] = nil + } else { + logger.Errorf("Stat key %s: %s", key, err) + errs[c] = err + } + } else { + results[c] = &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + } + + ms[c].Lock() + ready[c] = true + conds[c].Signal() + if errs[c] != nil { + ms[c].Unlock() + break + } + for ready[c] { + conds[c].Wait() + } + ms[c].Unlock() + } + }(c) + } + } go func() { defer close(objs) - defer ctx.Destroy() - for _, key := range keys { - st, err := ctx.Stat(key) - if err != nil { - if errors.Is(err, rados.ErrNotFound) { - logger.Debugf("Skip non-existent key: %s", key) - continue - } + for i := range keys { + c := i % concurrent + ms[c].Lock() + for !ready[c] { + conds[c].Wait() + } + if errs[c] != nil { objs <- nil - logger.Errorf("Stat key %s: %s", key, err) - return + ms[c].Unlock() + // some goroutines will be leaked, but it's ok + // since we won't call ListAll() many times in a process + break + } else if results[c] != nil { + objs <- results[c] } - objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + ready[c] = false + conds[c].Signal() + ms[c].Unlock() } }() return objs, nil