Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: wait() may miss signal then cause deadlock #3782

Merged
merged 2 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 48 additions & 39 deletions pkg/object/object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ var bufPool = sync.Pool{
},
}

type listThread struct {
sync.Mutex
cond *utils.Cond
ready bool
err error
entries []Object
}

func ListAllWithDelimiter(store ObjectStorage, prefix, start, end string) (<-chan Object, error) {
entries, err := store.List(prefix, "", "/", 1e9)
if err != nil {
Expand All @@ -191,72 +199,73 @@ func ListAllWithDelimiter(store ObjectStorage, prefix, start, end string) (<-cha
listed := make(chan Object, 10240)
var walk func(string, []Object) error
walk = func(prefix string, entries []Object) error {
var err error
var concurrent = 10
ms := make([]sync.Mutex, concurrent)
conds := make([]*utils.Cond, concurrent)
ready := make([]bool, concurrent)
children := make([][]Object, len(entries))
var err error
threads := make([]listThread, concurrent)
for c := 0; c < concurrent; c++ {
conds[c] = utils.NewCond(&ms[c])
t := &threads[c]
t.cond = utils.NewCond(t)
go func(c int) {
for i := c; i < len(entries); i += concurrent {
key := entries[i].Key()
if !entries[i].IsDir() || key == prefix {
continue
}
if end != "" && key >= end {
break
}
if key < start && !strings.HasPrefix(start, key) {
continue
}
if !entries[i].IsDir() || key == prefix {
continue
}

children[i], err = store.List(key, "\x00", "/", 1e9) // exclude itself
ms[c].Lock()
ready[c] = true
conds[c].Signal()
ms[c].Unlock()

ms[c].Lock()
for ready[c] {
conds[c].WaitWithTimeout(time.Second)
t.entries, t.err = store.List(key, "\x00", "/", 1e9) // exclude itself
t.Lock()
t.ready = true
t.cond.Signal()
for t.ready {
t.cond.WaitWithTimeout(time.Second)
if err != nil {
t.Unlock()
return
}
}
ms[c].Unlock()
t.Unlock()
}
}(c)
}

for i, e := range entries {
if end != "" && e.Key() >= end {
key := e.Key()
if end != "" && key >= end {
return nil
}
if e.Key() >= start {
if key >= start {
listed <- e
} else if !strings.HasPrefix(start, e.Key()) {
} else if !strings.HasPrefix(start, key) {
continue
}
if !e.IsDir() || key == prefix {
continue
}
if e.IsDir() && e.Key() != prefix {
c := i % concurrent
ms[c].Lock()
for !ready[c] {
conds[c].Wait()
}
ready[c] = false
conds[c].Signal()
ms[c].Unlock()
if err != nil {
return err
}

err = walk(e.Key(), children[i])
if err != nil {
return err
}
children[i] = nil
t := &threads[i%concurrent]
t.Lock()
for !t.ready {
t.cond.WaitWithTimeout(time.Millisecond * 10)
}
if t.err != nil {
err = t.err
t.Unlock()
return err
}
t.ready = false
t.cond.Signal()
children := t.entries
t.Unlock()

err = walk(key, children)
if err != nil {
return err
}
}
return nil
Expand Down
8 changes: 0 additions & 8 deletions pkg/utils/cond.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ func (c *Cond) Broadcast() {
c.signal = make(chan struct{})
}

// Wait until Signal() or Broadcast() is called.
func (c *Cond) Wait() {
ch := c.signal
c.L.Unlock()
<-ch
c.L.Lock()
}

var timerPool = sync.Pool{
New: func() interface{} {
return time.NewTimer(time.Second)
Expand Down
6 changes: 4 additions & 2 deletions pkg/utils/cond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ func TestCond(t *testing.T) {
go func() {
m.Lock()
wg.Done()
l.Wait()
for l.WaitWithTimeout(time.Second) {
}
l.Signal()
m.Unlock()
done <- true
}()
wg.Wait()
m.Lock()
l.Signal()
l.Wait()
for l.WaitWithTimeout(time.Second) {
}
m.Unlock()
select {
case <-done:
Expand Down