diff --git a/pkg/object/ceph.go b/pkg/object/ceph.go index 91701d00ed3f..4d0954ded425 100644 --- a/pkg/object/ceph.go +++ b/pkg/object/ceph.go @@ -230,24 +230,76 @@ func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, e } // the keys are not ordered, sort them first sort.Strings(keys) + c.release(ctx) var objs = make(chan Object, 1000) - // TODO: parallel + var concurrent = 20 + ms := make([]sync.Mutex, concurrent) + conds := make([]*sync.Cond, concurrent) + ready := make([]bool, concurrent) + results := make([]Object, concurrent) + errs := make([]error, concurrent) + for j := 0; j < concurrent; j++ { + conds[j] = sync.NewCond(&ms[j]) + if j < len(keys) { + go func(j int) { + ctx, err := c.newContext() + if err != nil { + logger.Errorf("new context: %s", err) + errs[j] = err + return + } + defer ctx.Destroy() + for i := j; i < len(keys); i += concurrent { + key := keys[i] + st, err := ctx.Stat(key) + if err != nil { + if errors.Is(err, rados.ErrNotFound) { + logger.Debugf("Skip non-existent key: %s", key) + results[j] = nil + } else { + logger.Errorf("Stat key %s: %s", key, err) + errs[j] = err + } + } else { + results[j] = &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + } + + ms[j].Lock() + ready[j] = true + conds[j].Signal() + if errs[j] != nil { + ms[j].Unlock() + break + } + for ready[j] { + conds[j].Wait() + } + ms[j].Unlock() + } + }(j) + } + } go func() { defer close(objs) - defer ctx.Destroy() - for _, key := range keys { - st, err := ctx.Stat(key) - if err != nil { - if errors.Is(err, rados.ErrNotFound) { - logger.Debugf("Skip non-existent key: %s", key) - continue - } + for i := range keys { + j := i % concurrent + ms[j].Lock() + for !ready[j] { + conds[j].Wait() + } + if errs[j] != nil { objs <- nil - logger.Errorf("Stat key %s: %s", key, err) - return + ms[j].Unlock() + // some goroutines will be leaked, but it's ok + // since we won't call ListAll() many times in a process + break + } else if results[j] != nil { + objs <- results[j] } - objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + ready[j] = false + conds[j].Signal() + ms[j].Unlock() } }() return objs, nil