Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid race conditions and add a method to prevent channel leakage #7

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- name: Run tests
run: go test -v -covermode=count
run: go test -v -covermode=count && go test -race -v

coverage:
runs-on: ubuntu-latest
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func main() {
// This function has to call to ensure all goroutines have finished
// after close the main program.
c.WaitAllDone()

// This function will release all the channels associated with goccm
c.Release()
}
```

Expand Down Expand Up @@ -76,5 +79,8 @@ func main() {

// Returns the number of goroutines which are running
c.RunningCount()

// Closes all the channels associated with goccm
c.Release()
}
```
26 changes: 21 additions & 5 deletions goccm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package goccm

import "sync/atomic"
import (
"sync/atomic"
)

type (
// ConcurrencyManager Interface
Expand All @@ -19,6 +21,9 @@ type (

// RunningCount Returns the number of goroutines which are running
RunningCount() int32

// Release Closes all the channels associated with goccm
Release()
}

concurrencyManager struct {
Expand All @@ -35,7 +40,7 @@ type (
allDoneCh chan bool

// The close flag allows we know when we can close the manager
closed bool
closed int32

// The running count allows we know the number of goroutines are running
runningCount int32
Expand Down Expand Up @@ -75,7 +80,7 @@ func (c *concurrencyManager) controller() {

// When the closed flag is set,
// we need to close the manager if it doesn't have any running goroutine
if c.closed && c.runningCount == 0 {
if c.IsClosed() && c.RunningCount() == 0 {
break
}
}
Expand All @@ -84,6 +89,10 @@ func (c *concurrencyManager) controller() {
c.allDoneCh <- true
}

func (c *concurrencyManager) IsClosed() bool {
return atomic.LoadInt32(&c.closed) > 0
}

// Wait until a slot is available for the new goroutine.
// A goroutine have to start after this function.
func (c *concurrencyManager) Wait() {
Expand All @@ -106,7 +115,7 @@ func (c *concurrencyManager) Done() {

// Close the manager manually
func (c *concurrencyManager) Close() {
c.closed = true
atomic.AddInt32(&c.closed, 1)
}

// WaitAllDone Wait for all goroutines are done
Expand All @@ -120,5 +129,12 @@ func (c *concurrencyManager) WaitAllDone() {

// RunningCount Returns the number of goroutines which are running
func (c *concurrencyManager) RunningCount() int32 {
return c.runningCount
return atomic.LoadInt32(&c.runningCount)
}

// Release Closes all the channels associated with goccm
func (c *concurrencyManager) Release() {
close(c.managerCh)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not close channels in Close() method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First it was there and I also think that it is more natural. But I think you expect to call the close twice from this test:
https://github.com/zenthangplus/goccm/blob/master/goccm_test.go#L22

close(c.doneCh)
close(c.allDoneCh)
}
11 changes: 6 additions & 5 deletions goccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goccm

import (
"fmt"
"sync/atomic"
"testing"
"time"
)
Expand All @@ -20,13 +21,13 @@ func TestExample(t *testing.T) {
}

func TestManuallyClose(t *testing.T) {
executedJobs := 0
executedJobs := int32(0)
c := New(3)
for i := 1; i <= 1000; i++ {
c.Wait()
go func() {
executedJobs++
fmt.Printf("Executed jobs %d\n", executedJobs)
atomic.AddInt32(&executedJobs, 1)
fmt.Printf("Executed jobs %d\n", atomic.LoadInt32(&executedJobs))
time.Sleep(2 * time.Second)
c.Done()
}()
Expand All @@ -46,8 +47,8 @@ func TestConcurrency(t *testing.T) {
c.Wait()
go func(i int) {
fmt.Printf("Current running jobs %d\n", c.RunningCount())
if c.RunningCount() > testMaxRunningJobs {
testMaxRunningJobs = c.RunningCount()
if c.RunningCount() > atomic.LoadInt32(&testMaxRunningJobs) {
atomic.StoreInt32(&testMaxRunningJobs, c.RunningCount())
}
time.Sleep(2 * time.Second)
c.Done()
Expand Down