From 05a99e23fde8b9b68c3909b6693de852709e9b3a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 30 Nov 2023 12:22:18 +0800 Subject: [PATCH 1/2] object/ceph: list keys from ceph in parallel --- pkg/object/ceph.go | 76 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 12 deletions(-) diff --git a/pkg/object/ceph.go b/pkg/object/ceph.go index 91701d00ed3f..c540092256ce 100644 --- a/pkg/object/ceph.go +++ b/pkg/object/ceph.go @@ -230,24 +230,76 @@ func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, e } // the keys are not ordered, sort them first sort.Strings(keys) + c.release(ctx) var objs = make(chan Object, 1000) - // TODO: parallel + var concurrent = 20 + ms := make([]sync.Mutex, concurrent) + conds := make([]*sync.Cond, concurrent) + ready := make([]bool, concurrent) + results := make([]Object, concurrent) + errs := make([]error, concurrent) + for c := 0; c < concurrent; c++ { + conds[c] = sync.NewCond(&ms[c]) + if c < len(keys) { + go func(c int) { + ctx, err := c.newContext() + if err != nil { + logger.Errorf("new context: %s", err) + errs[c] = err + return + } + defer ctx.Destroy() + for i := c; i < len(keys); i += concurrent { + key := keys[i] + st, err := ctx.Stat(key) + if err != nil { + if errors.Is(err, rados.ErrNotFound) { + logger.Debugf("Skip non-existent key: %s", key) + results[c] = nil + } else { + logger.Errorf("Stat key %s: %s", key, err) + errs[c] = err + } + } else { + results[c] = &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + } + + ms[c].Lock() + ready[c] = true + conds[c].Signal() + if errs[c] != nil { + ms[c].Unlock() + break + } + for ready[c] { + conds[c].Wait() + } + ms[c].Unlock() + } + }(c) + } + } go func() { defer close(objs) - defer ctx.Destroy() - for _, key := range keys { - st, err := ctx.Stat(key) - if err != nil { - if errors.Is(err, rados.ErrNotFound) { - logger.Debugf("Skip non-existent key: %s", key) - continue - } + for i := range keys { + c := i % concurrent + ms[c].Lock() + for !ready[c] { + conds[c].Wait() + } + if errs[c] != nil { objs <- nil - logger.Errorf("Stat key %s: %s", key, err) - return + ms[c].Unlock() + // some goroutines will be leaked, but it's ok + // since we won't call ListAll() many times in a process + break + } else if results[c] != nil { + objs <- results[c] } - objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + ready[c] = false + conds[c].Signal() + ms[c].Unlock() } }() return objs, nil From 20a2780de8d25f9673bf73b801a579a47a5d4ece Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 30 Nov 2023 13:33:51 +0800 Subject: [PATCH 2/2] fix build --- pkg/object/ceph.go | 58 +++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/object/ceph.go b/pkg/object/ceph.go index c540092256ce..4d0954ded425 100644 --- a/pkg/object/ceph.go +++ b/pkg/object/ceph.go @@ -239,67 +239,67 @@ func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, e ready := make([]bool, concurrent) results := make([]Object, concurrent) errs := make([]error, concurrent) - for c := 0; c < concurrent; c++ { - conds[c] = sync.NewCond(&ms[c]) - if c < len(keys) { - go func(c int) { + for j := 0; j < concurrent; j++ { + conds[j] = sync.NewCond(&ms[j]) + if j < len(keys) { + go func(j int) { ctx, err := c.newContext() if err != nil { logger.Errorf("new context: %s", err) - errs[c] = err + errs[j] = err return } defer ctx.Destroy() - for i := c; i < len(keys); i += concurrent { + for i := j; i < len(keys); i += concurrent { key := keys[i] st, err := ctx.Stat(key) if err != nil { if errors.Is(err, rados.ErrNotFound) { logger.Debugf("Skip non-existent key: %s", key) - results[c] = nil + results[j] = nil } else { logger.Errorf("Stat key %s: %s", key, err) - errs[c] = err + errs[j] = err } } else { - results[c] = &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + results[j] = &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} } - ms[c].Lock() - ready[c] = true - conds[c].Signal() - if errs[c] != nil { - ms[c].Unlock() + ms[j].Lock() + ready[j] = true + conds[j].Signal() + if errs[j] != nil { + ms[j].Unlock() break } - for ready[c] { - conds[c].Wait() + for ready[j] { + conds[j].Wait() } - ms[c].Unlock() + ms[j].Unlock() } - }(c) + }(j) } } go func() { defer close(objs) for i := range keys { - c := i % concurrent - ms[c].Lock() - for !ready[c] { - conds[c].Wait() + j := i % concurrent + ms[j].Lock() + for !ready[j] { + conds[j].Wait() } - if errs[c] != nil { + if errs[j] != nil { objs <- nil - ms[c].Unlock() + ms[j].Unlock() // some goroutines will be leaked, but it's ok // since we won't call ListAll() many times in a process break - } else if results[c] != nil { - objs <- results[c] + } else if results[j] != nil { + objs <- results[j] } - ready[c] = false - conds[c].Signal() - ms[c].Unlock() + ready[j] = false + conds[j].Signal() + ms[j].Unlock() } }() return objs, nil