Skip to content

Commit 04acb16

Browse files
authored
Task queue does not need to be buffered (#65)
* Task queue does not need to be buffered There is no need to buffer the task queue. Doing so does not enable any async behavior. * Doc spelling and formatting updates. * update goleak * update workflow
1 parent 56fe285 commit 04acb16

File tree

8 files changed

+92
-73
lines changed

8 files changed

+92
-73
lines changed

.github/workflows/go.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
- name: Set up Go
1717
uses: actions/setup-go@v2
1818
with:
19-
go-version: 1.16
19+
go-version: 1.18
2020

2121
- name: Vet
2222
run: go vet ./...

README.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ This implementation builds on ideas from the following:
1414
- http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html
1515

1616
## Installation
17-
To install this package, you need to setup your Go workspace. The simplest way to install the library is to run:
17+
18+
To install this package, you need to setup your Go workspace. The simplest way to install the library is to run:
1819
```
1920
$ go get github.com/gammazero/workerpool
2021
```
2122

2223
## Example
24+
2325
```go
2426
package main
2527

@@ -43,6 +45,8 @@ func main() {
4345
}
4446
```
4547

48+
[Example wrapper function](https://go.dev/play/p/BWnRhJYarZ1) to show start and finish time of submitted function.
49+
4650
## Usage Note
4751

48-
There is no upper limit on the number of tasks queued, other than the limits of system resources. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool. If should be solved by distributing load over multiple systems, and/or storing input for pending processing in intermediate storage such as a file system, distributed message queue, etc.
52+
There is no upper limit on the number of tasks queued, other than the limits of system resources. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool. It should be solved by distributing workload over multiple systems, and/or storing input for pending processing in intermediate storage such as a file system, distributed message queue, etc.

doc.go

+25-25
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,57 @@
11
/*
22
Package workerpool queues work to a limited number of goroutines.
33
4-
The purpose of the worker pool is to limit the concurrency of tasks
5-
executed by the workers. This is useful when performing tasks that require
6-
sufficient resources (CPU, memory, etc.), and running too many tasks at the
7-
same time would exhaust resources.
4+
The purpose of the worker pool is to limit the concurrency of tasks executed by
5+
the workers. This is useful when performing tasks that require sufficient
6+
resources (CPU, memory, etc.), and running too many tasks at the same time
7+
would exhaust resources.
88
99
Non-blocking task submission
1010
11-
A task is a function submitted to the worker pool for execution. Submitting
11+
A task is a function submitted to the worker pool for execution. Submitting
1212
tasks to this worker pool will not block, regardless of the number of tasks.
13-
Incoming tasks are immediately dispatched to an available
14-
worker. If no worker is immediately available, or there are already tasks
15-
waiting for an available worker, then the task is put on a waiting queue to
16-
wait for an available worker.
13+
Incoming tasks are immediately dispatched to an available worker. If no worker
14+
is immediately available, or there are already tasks waiting for an available
15+
worker, then the task is put on a waiting queue to wait for an available
16+
worker.
1717
1818
The intent of the worker pool is to limit the concurrency of task execution,
19-
not limit the number of tasks queued to be executed. Therefore, this unbounded
20-
input of tasks is acceptable as the tasks cannot be discarded. If the number
21-
of inbound tasks is too many to even queue for pending processing, then the
22-
solution is outside the scope of workerpool, and should be solved by
19+
not limit the number of tasks queued to be executed. Therefore, this unbounded
20+
input of tasks is acceptable as the tasks cannot be discarded. If the number of
21+
inbound tasks is too many to even queue for pending processing, then the
22+
solution is outside the scope of workerpool. It should be solved by
2323
distributing load over multiple systems, and/or storing input for pending
2424
processing in intermediate storage such as a database, file system, distributed
2525
message queue, etc.
2626
2727
Dispatcher
2828
2929
This worker pool uses a single dispatcher goroutine to read tasks from the
30-
input task queue and dispatch them to worker goroutines. This allows for a
30+
input task queue and dispatch them to worker goroutines. This allows for a
3131
small input channel, and lets the dispatcher queue as many tasks as are
32-
submitted when there are no available workers. Additionally, the dispatcher
33-
can adjust the number of workers as appropriate for the work load, without
34-
having to utilize locked counters and checks incurred on task submission.
32+
submitted when there are no available workers. Additionally, the dispatcher can
33+
adjust the number of workers as appropriate for the work load, without having
34+
to utilize locked counters and checks incurred on task submission.
3535
3636
When no tasks have been submitted for a period of time, a worker is removed by
37-
the dispatcher. This is done until there are no more workers to remove. The
37+
the dispatcher. This is done until there are no more workers to remove. The
3838
minimum number of workers is always zero, because the time to start new workers
3939
is insignificant.
4040
4141
Usage note
4242
4343
It is advisable to use different worker pools for tasks that are bound by
44-
different resources, or that have different resource use patterns. For
45-
example, tasks that use X Mb of memory may need different concurrency limits
46-
than tasks that use Y Mb of memory.
44+
different resources, or that have different resource use patterns. For example,
45+
tasks that use X Mb of memory may need different concurrency limits than tasks
46+
that use Y Mb of memory.
4747
4848
Waiting queue vs goroutines
4949
5050
When there are no available workers to handle incoming tasks, the tasks are put
51-
on a waiting queue, in this implementation. In implementations mentioned in
52-
the credits below, these tasks were passed to goroutines. Using a queue is
53-
faster and has less memory overhead than creating a separate goroutine for each
54-
waiting task, allowing a much higher number of waiting tasks. Also, using a
51+
on a waiting queue, in this implementation. In implementations mentioned in the
52+
credits below, these tasks were passed to goroutines. Using a queue is faster
53+
and has less memory overhead than creating a separate goroutine for each
54+
waiting task, allowing a much higher number of waiting tasks. Also, using a
5555
waiting queue ensures that tasks are given to workers in the order the tasks
5656
were received.
5757

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/gammazero/workerpool
22

33
require (
44
github.com/gammazero/deque v0.2.0
5-
go.uber.org/goleak v1.1.10
5+
go.uber.org/goleak v1.1.12
66
)
77

88
go 1.18

go.sum

+24-8
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,39 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
88
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
99
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1010
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
11-
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
12-
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
13-
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
14-
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
11+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
12+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
13+
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
14+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
15+
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
1516
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
17+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
1618
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
1719
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
20+
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
1821
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
22+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
1923
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
24+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
2025
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
26+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
2127
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
28+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
29+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
30+
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
31+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
32+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
2233
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
34+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
35+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
2336
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
24-
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
25-
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
37+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
38+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
39+
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
2640
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
41+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
42+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
2743
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2844
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
29-
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
30-
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
45+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
46+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

pacer/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
[![GoDoc](https://godoc.org/github.com/gammazero/workerpool/pacer?status.svg)](https://godoc.org/github.com/gammazero/workerpool/pacer)
44

5-
A utility to limit the rate at which concurrent goroutines begin execution. This addresses situations where running the concurrent goroutines is OK, as long as their execution does not start at the same time.
5+
A utility to limit the rate at which concurrent goroutines begin execution. This addresses situations where running the concurrent goroutines is OK, as long as their execution does not start at the same time.
66

7-
The pacer package is independent of the workerpool package. Paced functions can be submitted to a workerpool or can be run as goroutines, and execution will be paced in both cases.
7+
The pacer package is independent of the workerpool package. Paced functions can be submitted to a workerpool or can be run as goroutines, and execution will be paced in both cases.

pacer/pacer.go

+12-13
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
11
/*
22
Package pacer provides a utility to limit the rate at which concurrent
3-
goroutines begin execution. This addresses situations where running the
3+
goroutines begin execution. This addresses situations where running the
44
concurrent goroutines is OK, as long as their execution does not start at the
55
same time.
66
7-
The pacer package is independent of the workerpool package. Paced functions
8-
can be submitted to a workerpool or can be run as goroutines, and execution
9-
will be paced in both cases.
7+
The pacer package is independent of the workerpool package. Paced functions can
8+
be submitted to a workerpool or can be run as goroutines, and execution will be
9+
paced in both cases.
1010
1111
*/
1212
package pacer
1313

1414
import "time"
1515

16-
// Pacer is a goroutine rate limiter. When concurrent goroutines call
16+
// Pacer is a goroutine rate limiter. When concurrent goroutines call
1717
// Pacer.Next(), the call returns in a single goroutine at a time, at a rate no
1818
// faster than one per delay time.
1919
//
2020
// To use Pacer, create a new Pacer giving the interval that must elapse
21-
// between the start of one task the start of the next task. Then call Pace(),
22-
// passing your task function. A new paced task function is returned that can
21+
// between the start of one task the start of the next task. Then call Pace(),
22+
// passing your task function. A new paced task function is returned that can
2323
// then be passed to WorkerPool's Submit() or SubmitWait(), or called as a go
24-
// routine. Paced functions, that are run as goroutines, are also paced. For
24+
// routine. Paced functions, that are run as goroutines, are also paced. For
2525
// example:
2626
//
2727
// pacer := pacer.NewPacer(time.Second)
@@ -57,7 +57,7 @@ func NewPacer(delay time.Duration) *Pacer {
5757
return p
5858
}
5959

60-
// Pace wraps a function in a paced function. The returned paced function can
60+
// Pace wraps a function in a paced function. The returned paced function can
6161
// then be submitted to the workerpool, using Submit or SubmitWait, and
6262
// starting the tasks is paced according to the pacer's delay.
6363
func (p *Pacer) Pace(task func()) func() {
@@ -73,7 +73,7 @@ func (p *Pacer) Next() {
7373
p.gate <- struct{}{}
7474
}
7575

76-
// Stop stops the Pacer from running. Do not call until all paced tasks have
76+
// Stop stops the Pacer from running. Do not call until all paced tasks have
7777
// completed, or paced tasks will hang waiting for pacer to unblock them.
7878
func (p *Pacer) Stop() {
7979
close(p.gate)
@@ -97,9 +97,8 @@ func (p *Pacer) Resume() {
9797
}
9898

9999
func (p *Pacer) run() {
100-
// Read item from gate no faster than one per delay.
101-
// Reading from the unbuffered channel serves as a "tick"
102-
// and unblocks the writer.
100+
// Read item from gate no faster than one per delay. Reading from the
101+
// unbuffered channel serves as a "tick" and unblocks the writer.
103102
for range p.gate {
104103
time.Sleep(p.delay)
105104
p.pause <- struct{}{} // will wait here if channel blocked

0 commit comments

Comments
 (0)