Skip to content

Commit

Permalink
Example TaskPool: cancel if a worker fails
Browse files Browse the repository at this point in the history
  • Loading branch information
earthboundkid committed Jul 7, 2023
1 parent 20f9f4a commit 5302224
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 20 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,24 +235,25 @@ func main() {
// If the directory walk fails or any read operation fails,
// MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Make a pool of 20 digesters
in, out := flowmatic.TaskPool(20, func(path string) (*[md5.Size]byte, error) {
return digest(ctx, path)
})
in, out := flowmatic.TaskPool(20, digest)

m := make(map[string][md5.Size]byte)
// Open two goroutines:
// one for reading file names by walking the filesystem
// one for recording results from the digesters in a map
err := flowmatic.Do(
func() error {
return walkFilesystem(ctx, root, in)
},
func() error { return walkFilesystem(ctx, root, in) },
func() error {
for r := range out {
if r.Out != nil {
m[r.In] = *r.Out
if r.Err != nil {
cancel()
return r.Err
}
m[r.In] = *r.Out
}
return nil
},
Expand Down Expand Up @@ -281,13 +282,13 @@ func walkFilesystem(ctx context.Context, root string, in chan<- string) error {
})
}

func digest(ctx context.Context, path string) (*[md5.Size]byte, error) {
func digest(path string) (*[md5.Size]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
hash := md5.Sum(data)
return &hash, ctx.Err()
return &hash, nil
}
```

Expand Down
21 changes: 11 additions & 10 deletions taskpool_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,25 @@ func ExampleTaskPool() {
// If the directory walk fails or any read operation fails,
// MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Make a pool of 20 digesters
in, out := flowmatic.TaskPool(20, func(path string) (*[md5.Size]byte, error) {
return digest(ctx, path)
})
in, out := flowmatic.TaskPool(20, digest)

m := make(map[string][md5.Size]byte)
// Open two goroutines:
// one for reading file names by walking the filesystem
// one for recording results from the digesters in a map
err := flowmatic.Do(
func() error {
return walkFilesystem(ctx, root, in)
},
func() error { return walkFilesystem(ctx, root, in) },
func() error {
for r := range out {
if r.Out != nil {
m[r.In] = *r.Out
if r.Err != nil {
cancel()
return r.Err
}
m[r.In] = *r.Out
}
return nil
},
Expand Down Expand Up @@ -78,11 +79,11 @@ func walkFilesystem(ctx context.Context, root string, in chan<- string) error {
})
}

func digest(ctx context.Context, path string) (*[md5.Size]byte, error) {
func digest(path string) (*[md5.Size]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
hash := md5.Sum(data)
return &hash, ctx.Err()
return &hash, nil
}

0 comments on commit 5302224

Please sign in to comment.