Skip to content

Commit

Permalink
object/ceph: list keys from ceph in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Nov 30, 2023
1 parent e89ecd3 commit 05a99e2
Showing 1 changed file with 64 additions and 12 deletions.
76 changes: 64 additions & 12 deletions pkg/object/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,24 +230,76 @@ func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, e
}
// the keys are not ordered, sort them first
sort.Strings(keys)
c.release(ctx)

var objs = make(chan Object, 1000)
// TODO: parallel
var concurrent = 20
ms := make([]sync.Mutex, concurrent)
conds := make([]*sync.Cond, concurrent)
ready := make([]bool, concurrent)
results := make([]Object, concurrent)
errs := make([]error, concurrent)
for c := 0; c < concurrent; c++ {
conds[c] = sync.NewCond(&ms[c])
if c < len(keys) {
go func(c int) {
ctx, err := c.newContext()

Check failure on line 246 in pkg/object/ceph.go

View workflow job for this annotation

GitHub Actions / build (1.18)

c.newContext undefined (type int has no field or method newContext)

Check failure on line 246 in pkg/object/ceph.go

View workflow job for this annotation

GitHub Actions / build (1.19)

c.newContext undefined (type int has no field or method newContext)

Check failure on line 246 in pkg/object/ceph.go

View workflow job for this annotation

GitHub Actions / build (1.20)

c.newContext undefined (type int has no field or method newContext)

Check failure on line 246 in pkg/object/ceph.go

View workflow job for this annotation

GitHub Actions / build (1.21)

c.newContext undefined (type int has no field or method newContext)
if err != nil {
logger.Errorf("new context: %s", err)
errs[c] = err
return
}
defer ctx.Destroy()
for i := c; i < len(keys); i += concurrent {
key := keys[i]
st, err := ctx.Stat(key)
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
logger.Debugf("Skip non-existent key: %s", key)
results[c] = nil
} else {
logger.Errorf("Stat key %s: %s", key, err)
errs[c] = err
}
} else {
results[c] = &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""}
}

ms[c].Lock()
ready[c] = true
conds[c].Signal()
if errs[c] != nil {
ms[c].Unlock()
break
}
for ready[c] {
conds[c].Wait()
}
ms[c].Unlock()
}
}(c)
}
}
go func() {
defer close(objs)
defer ctx.Destroy()
for _, key := range keys {
st, err := ctx.Stat(key)
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
logger.Debugf("Skip non-existent key: %s", key)
continue
}
for i := range keys {
c := i % concurrent
ms[c].Lock()
for !ready[c] {
conds[c].Wait()
}
if errs[c] != nil {
objs <- nil
logger.Errorf("Stat key %s: %s", key, err)
return
ms[c].Unlock()
// some goroutines will be leaked, but it's ok
// since we won't call ListAll() many times in a process
break
} else if results[c] != nil {
objs <- results[c]
}
objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""}
ready[c] = false
conds[c].Signal()
ms[c].Unlock()
}
}()
return objs, nil
Expand Down

0 comments on commit 05a99e2

Please sign in to comment.