From ef82870078a90aa2d8fc8f0d814e35e81d872ad6 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Mon, 10 Aug 2020 17:21:57 +0200 Subject: [PATCH] Cap root module loading via worker pool (#256) * deps: Add github.com/gammazero/workerpool * Cap root module loading via worker pool --- go.mod | 1 + go.sum | 4 + .../rootmodule/root_module_manager.go | 26 +- internal/terraform/rootmodule/types.go | 2 + langserver/handlers/service.go | 9 + langserver/handlers/tick_reporter.go | 40 +++ vendor/github.com/gammazero/deque/.gitignore | 26 ++ vendor/github.com/gammazero/deque/.travis.yml | 15 + vendor/github.com/gammazero/deque/LICENSE | 21 ++ vendor/github.com/gammazero/deque/README.md | 73 +++++ vendor/github.com/gammazero/deque/deque.go | 243 +++++++++++++++++ vendor/github.com/gammazero/deque/doc.go | 34 +++ vendor/github.com/gammazero/deque/go.mod | 3 + vendor/github.com/gammazero/deque/go.test.sh | 12 + .../gammazero/workerpool/.gitignore | 28 ++ .../gammazero/workerpool/.travis.yml | 15 + .../github.com/gammazero/workerpool/LICENSE | 21 ++ .../github.com/gammazero/workerpool/README.md | 52 ++++ vendor/github.com/gammazero/workerpool/doc.go | 66 +++++ vendor/github.com/gammazero/workerpool/go.mod | 5 + vendor/github.com/gammazero/workerpool/go.sum | 8 + .../gammazero/workerpool/go.test.sh | 12 + .../gammazero/workerpool/workerpool.go | 257 ++++++++++++++++++ vendor/modules.txt | 4 + 24 files changed, 973 insertions(+), 4 deletions(-) create mode 100644 langserver/handlers/tick_reporter.go create mode 100644 vendor/github.com/gammazero/deque/.gitignore create mode 100644 vendor/github.com/gammazero/deque/.travis.yml create mode 100644 vendor/github.com/gammazero/deque/LICENSE create mode 100644 vendor/github.com/gammazero/deque/README.md create mode 100644 vendor/github.com/gammazero/deque/deque.go create mode 100644 vendor/github.com/gammazero/deque/doc.go create mode 100644 vendor/github.com/gammazero/deque/go.mod create mode 100644 vendor/github.com/gammazero/deque/go.test.sh create mode 100644 vendor/github.com/gammazero/workerpool/.gitignore create mode 100644 vendor/github.com/gammazero/workerpool/.travis.yml create mode 100644 vendor/github.com/gammazero/workerpool/LICENSE create mode 100644 vendor/github.com/gammazero/workerpool/README.md create mode 100644 vendor/github.com/gammazero/workerpool/doc.go create mode 100644 vendor/github.com/gammazero/workerpool/go.mod create mode 100644 vendor/github.com/gammazero/workerpool/go.sum create mode 100644 vendor/github.com/gammazero/workerpool/go.test.sh create mode 100644 vendor/github.com/gammazero/workerpool/workerpool.go diff --git a/go.mod b/go.mod index 8445c06de..1ab669002 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/apparentlymart/go-textseg v1.0.0 github.com/creachadair/jrpc2 v0.10.0 github.com/fsnotify/fsnotify v1.4.9 + github.com/gammazero/workerpool v1.0.0 github.com/google/go-cmp v0.4.1 github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/go-version v1.2.0 diff --git a/go.sum b/go.sum index ab86451d1..fad35434e 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,10 @@ github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= +github.com/gammazero/workerpool v1.0.0 h1:MfkJc6KL0tAmjrRDS203AZz3F+84Uod9YbL8KjpcQ00= +github.com/gammazero/workerpool v1.0.0/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/internal/terraform/rootmodule/root_module_manager.go b/internal/terraform/rootmodule/root_module_manager.go index ccc2af71c..89e0c0a17 100644 --- a/internal/terraform/rootmodule/root_module_manager.go +++ b/internal/terraform/rootmodule/root_module_manager.go @@ -6,9 +6,11 @@ import ( "log" "os" "path/filepath" + "runtime" "strings" "time" + "github.com/gammazero/workerpool" "github.com/hashicorp/terraform-config-inspect/tfconfig" "github.com/hashicorp/terraform-ls/internal/terraform/discovery" "github.com/hashicorp/terraform-ls/internal/terraform/exec" @@ -22,6 +24,7 @@ type rootModuleManager struct { filesystem tfconfig.FS syncLoading bool + workerPool *workerpool.WorkerPool logger *log.Logger // terraform discovery @@ -40,9 +43,14 @@ func NewRootModuleManager(fs tfconfig.FS) RootModuleManager { func newRootModuleManager(fs tfconfig.FS) *rootModuleManager { d := &discovery.Discovery{} + + defaultSize := 3 * runtime.NumCPU() + wp := workerpool.New(defaultSize) + rmm := &rootModuleManager{ rms: make([]*rootModule, 0), filesystem: fs, + workerPool: wp, logger: defaultLogger, tfDiscoFunc: d.LookPath, tfNewExecutor: exec.NewExecutor, @@ -51,6 +59,14 @@ func newRootModuleManager(fs tfconfig.FS) *rootModuleManager { return rmm } +func (rmm *rootModuleManager) WorkerPoolSize() int { + return rmm.workerPool.Size() +} + +func (rmm *rootModuleManager) WorkerQueueSize() int { + return rmm.workerPool.WaitingQueueSize() +} + func (rmm *rootModuleManager) defaultRootModuleFactory(ctx context.Context, dir string) (*rootModule, error) { rm := newRootModule(rmm.filesystem, dir) @@ -106,10 +122,11 @@ func (rmm *rootModuleManager) AddAndStartLoadingRootModule(ctx context.Context, } rmm.logger.Printf("asynchronously loading root module %s", dir) - err = rm.StartLoading() - if err != nil { - return rm, err - } + rmm.workerPool.Submit(func() { + rm := rm + err := rm.load(context.Background()) + rm.setLoadErr(err) + }) return rm, nil } @@ -253,6 +270,7 @@ func (rmm *rootModuleManager) CancelLoading() { rm.CancelLoading() rmm.logger.Printf("loading cancelled for %s", rm.Path()) } + rmm.workerPool.Stop() } // rootModuleDirFromPath strips known lock file paths and filenames diff --git a/internal/terraform/rootmodule/types.go b/internal/terraform/rootmodule/types.go index 154280f71..fd4f3159e 100644 --- a/internal/terraform/rootmodule/types.go +++ b/internal/terraform/rootmodule/types.go @@ -43,6 +43,8 @@ type RootModuleManager interface { SetTerraformExecTimeout(timeout time.Duration) AddAndStartLoadingRootModule(ctx context.Context, dir string) (RootModule, error) + WorkerPoolSize() int + WorkerQueueSize() int ListRootModules() RootModules PathsToWatch() []string RootModuleByPath(path string) (RootModule, error) diff --git a/langserver/handlers/service.go b/langserver/handlers/service.go index 842dcc307..8ef9600aa 100644 --- a/langserver/handlers/service.go +++ b/langserver/handlers/service.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "log" + "time" "github.com/creachadair/jrpc2" "github.com/creachadair/jrpc2/code" @@ -73,6 +74,14 @@ func (svc *service) Assigner() (jrpc2.Assigner, error) { svc.modMgr = svc.newRootModuleManager(fs) svc.modMgr.SetLogger(svc.logger) + svc.logger.Printf("Worker pool size set to %d", svc.modMgr.WorkerPoolSize()) + + tr := newTickReporter(5 * time.Second) + tr.AddReporter(func() { + svc.logger.Printf("Root modules waiting to be loaded: %d", svc.modMgr.WorkerQueueSize()) + }) + tr.StartReporting(svc.sessCtx) + svc.walker = svc.newWalker() // The following is set via CLI flags, hence available in the server context diff --git a/langserver/handlers/tick_reporter.go b/langserver/handlers/tick_reporter.go new file mode 100644 index 000000000..55a8e8e26 --- /dev/null +++ b/langserver/handlers/tick_reporter.go @@ -0,0 +1,40 @@ +package handlers + +import ( + "context" + "time" +) + +func newTickReporter(d time.Duration) *tickReporter { + return &tickReporter{ + t: time.NewTicker(d), + rfs: make([]reportFunc, 0), + } +} + +type reportFunc func() + +type tickReporter struct { + t *time.Ticker + rfs []reportFunc +} + +func (tr *tickReporter) AddReporter(f reportFunc) { + tr.rfs = append(tr.rfs, f) +} + +func (tr *tickReporter) StartReporting(ctx context.Context) { + go func(ctx context.Context, tr *tickReporter) { + for { + select { + case <-ctx.Done(): + tr.t.Stop() + return + case <-tr.t.C: + for _, rf := range tr.rfs { + rf() + } + } + } + }(ctx, tr) +} diff --git a/vendor/github.com/gammazero/deque/.gitignore b/vendor/github.com/gammazero/deque/.gitignore new file mode 100644 index 000000000..b33406fb0 --- /dev/null +++ b/vendor/github.com/gammazero/deque/.gitignore @@ -0,0 +1,26 @@ +*~ + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/gammazero/deque/.travis.yml b/vendor/github.com/gammazero/deque/.travis.yml new file mode 100644 index 000000000..69e6ab3b3 --- /dev/null +++ b/vendor/github.com/gammazero/deque/.travis.yml @@ -0,0 +1,15 @@ +language: go + +go: + - 1.13.x + - 1.14.x + - tip + +before_script: + - go vet ./... + +script: + - ./go.test.sh + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/gammazero/deque/LICENSE b/vendor/github.com/gammazero/deque/LICENSE new file mode 100644 index 000000000..0566f2661 --- /dev/null +++ b/vendor/github.com/gammazero/deque/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Andrew J. Gillis + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/gammazero/deque/README.md b/vendor/github.com/gammazero/deque/README.md new file mode 100644 index 000000000..66271db51 --- /dev/null +++ b/vendor/github.com/gammazero/deque/README.md @@ -0,0 +1,73 @@ +# deque + +[![Build Status](https://travis-ci.com/gammazero/deque.svg)](https://travis-ci.com/gammazero/deque) +[![Go Report Card](https://goreportcard.com/badge/github.com/gammazero/deque)](https://goreportcard.com/report/github.com/gammazero/deque) +[![codecov](https://codecov.io/gh/gammazero/deque/branch/master/graph/badge.svg)](https://codecov.io/gh/gammazero/deque) +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) + +Extremely fast ring-buffer deque ([double-ended queue](https://en.wikipedia.org/wiki/Double-ended_queue)) implementation. + +[![GoDoc](https://godoc.org/github.com/gammazero/deque?status.svg)](https://godoc.org/github.com/gammazero/deque) + +For a pictorial description, see the [Deque diagram](https://github.com/gammazero/deque/wiki) + +## Installation + +``` +$ go get github.com/gammazero/deque +``` + +## Deque data structure + +Deque generalizes a queue and a stack, to efficiently add and remove items at either end with O(1) performance. [Queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)) (FIFO) operations are supported using `PushBack()` and `PopFront()`. [Stack](https://en.wikipedia.org/wiki/Stack_(abstract_data_type)) (LIFO) operations are supported using `PushBack()` and `PopBack()`. + +## Ring-buffer Performance + +This deque implementation is optimized for CPU and GC performance. The circular buffer automatically re-sizes by powers of two, growing when additional capacity is needed and shrinking when only a quarter of the capacity is used, and uses bitwise arithmetic for all calculations. Since growth is by powers of two, adding elements will only cause O(log n) allocations. + +The ring-buffer implementation significantly improves memory and time performance with fewer GC pauses, compared to implementations based on slices and linked lists. By wrapping around the buffer, previously used space is reused, making allocation unnecessary until all buffer capacity is used. + +For maximum speed, this deque implementation leaves concurrency safety up to the application to provide, however the application chooses, if needed at all. + +## Reading Empty Deque + +Since it is OK for the deque to contain a nil value, it is necessary to either panic or return a second boolean value to indicate the deque is empty, when reading or removing an element. This deque panics when reading from an empty deque. This is a run-time check to help catch programming errors, which may be missed if a second return value is ignored. Simply check Deque.Len() before reading from the deque. + +## Example + +```go +package main + +import ( + "fmt" + "github.com/gammazero/deque" +) + +func main() { + var q deque.Deque + q.PushBack("foo") + q.PushBack("bar") + q.PushBack("baz") + + fmt.Println(q.Len()) // Prints: 3 + fmt.Println(q.Front()) // Prints: foo + fmt.Println(q.Back()) // Prints: baz + + q.PopFront() // remove "foo" + q.PopBack() // remove "baz" + + q.PushFront("hello") + q.PushBack("world") + + // Consume deque and print elements. + for q.Len() != 0 { + fmt.Println(q.PopFront()) + } +} +``` + +## Uses + +Deque can be used as both a: +- [Queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)) using `PushBack` and `PopFront` +- [Stack](https://en.wikipedia.org/wiki/Stack_(abstract_data_type)) using `PushBack` and `PopBack` diff --git a/vendor/github.com/gammazero/deque/deque.go b/vendor/github.com/gammazero/deque/deque.go new file mode 100644 index 000000000..aa9d34eed --- /dev/null +++ b/vendor/github.com/gammazero/deque/deque.go @@ -0,0 +1,243 @@ +package deque + +// minCapacity is the smallest capacity that deque may have. +// Must be power of 2 for bitwise modulus: x % n == x & (n - 1). +const minCapacity = 16 + +// Deque represents a single instance of the deque data structure. +type Deque struct { + buf []interface{} + head int + tail int + count int + minCap int +} + +// Len returns the number of elements currently stored in the queue. +func (q *Deque) Len() int { + return q.count +} + +// PushBack appends an element to the back of the queue. Implements FIFO when +// elements are removed with PopFront(), and LIFO when elements are removed +// with PopBack(). +func (q *Deque) PushBack(elem interface{}) { + q.growIfFull() + + q.buf[q.tail] = elem + // Calculate new tail position. + q.tail = q.next(q.tail) + q.count++ +} + +// PushFront prepends an element to the front of the queue. +func (q *Deque) PushFront(elem interface{}) { + q.growIfFull() + + // Calculate new head position. + q.head = q.prev(q.head) + q.buf[q.head] = elem + q.count++ +} + +// PopFront removes and returns the element from the front of the queue. +// Implements FIFO when used with PushBack(). If the queue is empty, the call +// panics. +func (q *Deque) PopFront() interface{} { + if q.count <= 0 { + panic("deque: PopFront() called on empty queue") + } + ret := q.buf[q.head] + q.buf[q.head] = nil + // Calculate new head position. + q.head = q.next(q.head) + q.count-- + + q.shrinkIfExcess() + return ret +} + +// PopBack removes and returns the element from the back of the queue. +// Implements LIFO when used with PushBack(). If the queue is empty, the call +// panics. +func (q *Deque) PopBack() interface{} { + if q.count <= 0 { + panic("deque: PopBack() called on empty queue") + } + + // Calculate new tail position + q.tail = q.prev(q.tail) + + // Remove value at tail. + ret := q.buf[q.tail] + q.buf[q.tail] = nil + q.count-- + + q.shrinkIfExcess() + return ret +} + +// Front returns the element at the front of the queue. This is the element +// that would be returned by PopFront(). This call panics if the queue is +// empty. +func (q *Deque) Front() interface{} { + if q.count <= 0 { + panic("deque: Front() called when empty") + } + return q.buf[q.head] +} + +// Back returns the element at the back of the queue. This is the element +// that would be returned by PopBack(). This call panics if the queue is +// empty. +func (q *Deque) Back() interface{} { + if q.count <= 0 { + panic("deque: Back() called when empty") + } + return q.buf[q.prev(q.tail)] +} + +// At returns the element at index i in the queue without removing the element +// from the queue. This method accepts only non-negative index values. At(0) +// refers to the first element and is the same as Front(). At(Len()-1) refers +// to the last element and is the same as Back(). If the index is invalid, the +// call panics. +// +// The purpose of At is to allow Deque to serve as a more general purpose +// circular buffer, where items are only added to and removed from the ends of +// the deque, but may be read from any place within the deque. Consider the +// case of a fixed-size circular log buffer: A new entry is pushed onto one end +// and when full the oldest is popped from the other end. All the log entries +// in the buffer must be readable without altering the buffer contents. +func (q *Deque) At(i int) interface{} { + if i < 0 || i >= q.count { + panic("deque: At() called with index out of range") + } + // bitwise modulus + return q.buf[(q.head+i)&(len(q.buf)-1)] +} + +// Clear removes all elements from the queue, but retains the current capacity. +// This is useful when repeatedly reusing the queue at high frequency to avoid +// GC during reuse. The queue will not be resized smaller as long as items are +// only added. Only when items are removed is the queue subject to getting +// resized smaller. +func (q *Deque) Clear() { + // bitwise modulus + modBits := len(q.buf) - 1 + for h := q.head; h != q.tail; h = (h + 1) & modBits { + q.buf[h] = nil + } + q.head = 0 + q.tail = 0 + q.count = 0 +} + +// Rotate rotates the deque n steps front-to-back. If n is negative, rotates +// back-to-front. Having Deque provide Rotate() avoids resizing that could +// happen if implementing rotation using only Pop and Push methods. +func (q *Deque) Rotate(n int) { + if q.count <= 1 { + return + } + // Rotating a multiple of q.count is same as no rotation. + n %= q.count + if n == 0 { + return + } + + modBits := len(q.buf) - 1 + // If no empty space in buffer, only move head and tail indexes. + if q.head == q.tail { + // Calculate new head and tail using bitwise modulus. + q.head = (q.head + n) & modBits + q.tail = (q.tail + n) & modBits + return + } + + if n < 0 { + // Rotate back to front. + for ; n < 0; n++ { + // Calculate new head and tail using bitwise modulus. + q.head = (q.head - 1) & modBits + q.tail = (q.tail - 1) & modBits + // Put tail value at head and remove value at tail. + q.buf[q.head] = q.buf[q.tail] + q.buf[q.tail] = nil + } + return + } + + // Rotate front to back. + for ; n > 0; n-- { + // Put head value at tail and remove value at head. + q.buf[q.tail] = q.buf[q.head] + q.buf[q.head] = nil + // Calculate new head and tail using bitwise modulus. + q.head = (q.head + 1) & modBits + q.tail = (q.tail + 1) & modBits + } +} + +// SetMinCapacity sets a minimum capacity of 2^minCapacityExp. If the value of +// the minimum capacity is less than or equal to the minimum allowed, then +// capacity is set to the minimum allowed. This may be called at anytime to +// set a new minimum capacity. +// +// Setting a larger minimum capacity may be used to prevent resizing when the +// number of stored items changes frequently across a wide range. +func (q *Deque) SetMinCapacity(minCapacityExp uint) { + if 1< minCapacity { + q.minCap = 1 << minCapacityExp + } else { + q.minCap = minCapacity + } +} + +// prev returns the previous buffer position wrapping around buffer. +func (q *Deque) prev(i int) int { + return (i - 1) & (len(q.buf) - 1) // bitwise modulus +} + +// next returns the next buffer position wrapping around buffer. +func (q *Deque) next(i int) int { + return (i + 1) & (len(q.buf) - 1) // bitwise modulus +} + +// growIfFull resizes up if the buffer is full. +func (q *Deque) growIfFull() { + if len(q.buf) == 0 { + if q.minCap == 0 { + q.minCap = minCapacity + } + q.buf = make([]interface{}, q.minCap) + return + } + if q.count == len(q.buf) { + q.resize() + } +} + +// shrinkIfExcess resize down if the buffer 1/4 full. +func (q *Deque) shrinkIfExcess() { + if len(q.buf) > q.minCap && (q.count<<2) == len(q.buf) { + q.resize() + } +} + +// resize resizes the deque to fit exactly twice its current contents. This is +// used to grow the queue when it is full, and also to shrink it when it is +// only a quarter full. +func (q *Deque) resize() { + newBuf := make([]interface{}, q.count<<1) + if q.tail > q.head { + copy(newBuf, q.buf[q.head:q.tail]) + } else { + n := copy(newBuf, q.buf[q.head:]) + copy(newBuf[n:], q.buf[:q.tail]) + } + + q.head = 0 + q.tail = q.count + q.buf = newBuf +} diff --git a/vendor/github.com/gammazero/deque/doc.go b/vendor/github.com/gammazero/deque/doc.go new file mode 100644 index 000000000..c9647f983 --- /dev/null +++ b/vendor/github.com/gammazero/deque/doc.go @@ -0,0 +1,34 @@ +/* +Package deque provides a fast ring-buffer deque (double-ended queue) +implementation. + +Deque generalizes a queue and a stack, to efficiently add and remove items at +either end with O(1) performance. Queue (FIFO) operations are supported using +PushBack() and PopFront(). Stack (LIFO) operations are supported using +PushBack() and PopBack(). + +Ring-buffer Performance + +The ring-buffer automatically resizes by +powers of two, growing when additional capacity is needed and shrinking when +only a quarter of the capacity is used, and uses bitwise arithmetic for all +calculations. + +The ring-buffer implementation significantly improves memory and time +performance with fewer GC pauses, compared to implementations based on slices +and linked lists. + +For maximum speed, this deque implementation leaves concurrency safety up to +the application to provide, however the application chooses, if needed at all. + +Reading Empty Deque + +Since it is OK for the deque to contain a nil value, it is necessary to either +panic or return a second boolean value to indicate the deque is empty, when +reading or removing an element. This deque panics when reading from an empty +deque. This is a run-time check to help catch programming errors, which may be +missed if a second return value is ignored. Simply check Deque.Len() before +reading from the deque. + +*/ +package deque diff --git a/vendor/github.com/gammazero/deque/go.mod b/vendor/github.com/gammazero/deque/go.mod new file mode 100644 index 000000000..a63a33aec --- /dev/null +++ b/vendor/github.com/gammazero/deque/go.mod @@ -0,0 +1,3 @@ +module github.com/gammazero/deque + +go 1.12 diff --git a/vendor/github.com/gammazero/deque/go.test.sh b/vendor/github.com/gammazero/deque/go.test.sh new file mode 100644 index 000000000..494b176ba --- /dev/null +++ b/vendor/github.com/gammazero/deque/go.test.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -e +echo "" > coverage.txt + +for d in $(go list ./... | grep -v vendor); do + go test -coverprofile=profile.out -covermode=atomic $d + if [ -f profile.out ]; then + cat profile.out >> coverage.txt + rm profile.out + fi +done diff --git a/vendor/github.com/gammazero/workerpool/.gitignore b/vendor/github.com/gammazero/workerpool/.gitignore new file mode 100644 index 000000000..ff737f5aa --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/.gitignore @@ -0,0 +1,28 @@ +*~ + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof + +coverage.out diff --git a/vendor/github.com/gammazero/workerpool/.travis.yml b/vendor/github.com/gammazero/workerpool/.travis.yml new file mode 100644 index 000000000..69e6ab3b3 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/.travis.yml @@ -0,0 +1,15 @@ +language: go + +go: + - 1.13.x + - 1.14.x + - tip + +before_script: + - go vet ./... + +script: + - ./go.test.sh + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/gammazero/workerpool/LICENSE b/vendor/github.com/gammazero/workerpool/LICENSE new file mode 100644 index 000000000..f6ff6ce9d --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Andrew J. Gillis + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/gammazero/workerpool/README.md b/vendor/github.com/gammazero/workerpool/README.md new file mode 100644 index 000000000..bcf37d100 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/README.md @@ -0,0 +1,52 @@ +# workerpool +[![Build Status](https://travis-ci.com/gammazero/workerpool.svg)](https://travis-ci.com/gammazero/workerpool) +[![Go Report Card](https://goreportcard.com/badge/github.com/gammazero/workerpool)](https://goreportcard.com/report/github.com/gammazero/workerpool) +[![codecov](https://codecov.io/gh/gammazero/workerpool/branch/master/graph/badge.svg)](https://codecov.io/gh/gammazero/workerpool) +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/gammazero/workerpool/blob/master/LICENSE) + +Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting tasks, no matter how many tasks are queued. + +[![GoDoc](https://godoc.org/github.com/gammazero/workerpool?status.svg)](https://godoc.org/github.com/gammazero/workerpool) + +This implementation builds on ideas from the following: + +- http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang +- http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html + +## Installation +To install this package, you need to setup your Go workspace. The simplest way to install the library is to run: +``` +$ go get github.com/gammazero/workerpool +``` + +## Example +```go +package main + +import ( + "fmt" + "github.com/gammazero/workerpool" +) + +func main() { + wp := workerpool.New(2) + requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"} + + for _, r := range requests { + r := r + wp.Submit(func() { + fmt.Println("Handling request:", r) + }) + } + + wp.StopWait() +} +``` + +## Usage Note + +There is no upper limit on the number of tasks queued, other than the limits of system resources. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool, and should be solved by distributing load over multiple systems, and/or storing input for pending processing in intermediate storage such as a file system, distributed message queue, etc. + +## Real world examples + +The list of open source projects using worker pool can be found [here](https://github.com/gammazero/workerpool/wiki#open-projects-using-workerpool) diff --git a/vendor/github.com/gammazero/workerpool/doc.go b/vendor/github.com/gammazero/workerpool/doc.go new file mode 100644 index 000000000..c189fde82 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/doc.go @@ -0,0 +1,66 @@ +/* +Package workerpool queues work to a limited number of goroutines. + +The purpose of the worker pool is to limit the concurrency of tasks +executed by the workers. This is useful when performing tasks that require +sufficient resources (CPU, memory, etc.), and running too many tasks at the +same time would exhaust resources. + +Non-blocking task submission + +A task is a function submitted to the worker pool for execution. Submitting +tasks to this worker pool will not block, regardless of the number of tasks. +Incoming tasks are immediately dispatched to an available +worker. If no worker is immediately available, or there are already tasks +waiting for an available worker, then the task is put on a waiting queue to +wait for an available worker. + +The intent of the worker pool is to limit the concurrency of task execution, +not limit the number of tasks queued to be executed. Therefore, this unbounded +input of tasks is acceptable as the tasks cannot be discarded. If the number +of inbound tasks is too many to even queue for pending processing, then the +solution is outside the scope of workerpool, and should be solved by +distributing load over multiple systems, and/or storing input for pending +processing in intermediate storage such as a database, file system, distributed +message queue, etc. + +Dispatcher + +This worker pool uses a single dispatcher goroutine to read tasks from the +input task queue and dispatch them to worker goroutines. This allows for a +small input channel, and lets the dispatcher queue as many tasks as are +submitted when there are no available workers. Additionally, the dispatcher +can adjust the number of workers as appropriate for the work load, without +having to utilize locked counters and checks incurred on task submission. + +When no tasks have been submitted for a period of time, a worker is removed by +the dispatcher. This is done until there are no more workers to remove. The +minimum number of workers is always zero, because the time to start new workers +is insignificant. + +Usage note + +It is advisable to use different worker pools for tasks that are bound by +different resources, or that have different resource use patterns. For +example, tasks that use X Mb of memory may need different concurrency limits +than tasks that use Y Mb of memory. + +Waiting queue vs goroutines + +When there are no available workers to handle incoming tasks, the tasks are put +on a waiting queue, in this implementation. In implementations mentioned in +the credits below, these tasks were passed to goroutines. Using a queue is +faster and has less memory overhead than creating a separate goroutine for each +waiting task, allowing a much higher number of waiting tasks. Also, using a +waiting queue ensures that tasks are given to workers in the order the tasks +were received. + +Credits + +This implementation builds on ideas from the following: + +http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang +http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html + +*/ +package workerpool diff --git a/vendor/github.com/gammazero/workerpool/go.mod b/vendor/github.com/gammazero/workerpool/go.mod new file mode 100644 index 000000000..b4fbf028f --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/go.mod @@ -0,0 +1,5 @@ +module github.com/gammazero/workerpool + +require github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 + +go 1.13 diff --git a/vendor/github.com/gammazero/workerpool/go.sum b/vendor/github.com/gammazero/workerpool/go.sum new file mode 100644 index 000000000..8335ef5f0 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/go.sum @@ -0,0 +1,8 @@ +github.com/gammazero/deque v0.0.0-20190515085511-50f5fc498a27 h1:eqgrF1fSzfhRQet8I3ws3MrGPLS7z2n1emuetE3yr2w= +github.com/gammazero/deque v0.0.0-20190515085511-50f5fc498a27/go.mod h1:GeIq9qoE43YdGnDXURnmKTnGg15pQz4mYkXSTChbneI= +github.com/gammazero/deque v0.0.0-20190521012701-46e4ffb7a622 h1:lxbhOGZ9pU3Kf8P6lFluUcE82yVZn2EqEf4+mWRNPV0= +github.com/gammazero/deque v0.0.0-20190521012701-46e4ffb7a622/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= +github.com/gammazero/deque v0.0.0-20200124200322-7e84b94275b8 h1:9eg1l6iiw3aLaU3P8RRoosiYCRoWDg8HzPiy8xzWKgg= +github.com/gammazero/deque v0.0.0-20200124200322-7e84b94275b8/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= diff --git a/vendor/github.com/gammazero/workerpool/go.test.sh b/vendor/github.com/gammazero/workerpool/go.test.sh new file mode 100644 index 000000000..34dbbfb31 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/go.test.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -e +echo "" > coverage.txt + +for d in $(go list ./... | grep -v vendor); do + go test -race -coverprofile=profile.out -covermode=atomic $d + if [ -f profile.out ]; then + cat profile.out >> coverage.txt + rm profile.out + fi +done diff --git a/vendor/github.com/gammazero/workerpool/workerpool.go b/vendor/github.com/gammazero/workerpool/workerpool.go new file mode 100644 index 000000000..7c2bb5c5c --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/workerpool.go @@ -0,0 +1,257 @@ +package workerpool + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/gammazero/deque" +) + +const ( + // If workes idle for at least this period of time, then stop a worker. + idleTimeout = 2 * time.Second +) + +// New creates and starts a pool of worker goroutines. +// +// The maxWorkers parameter specifies the maximum number of workers that can +// execute tasks concurrently. When there are no incoming tasks, workers are +// gradually stopped until there are no remaining workers. +func New(maxWorkers int) *WorkerPool { + // There must be at least one worker. + if maxWorkers < 1 { + maxWorkers = 1 + } + + pool := &WorkerPool{ + maxWorkers: maxWorkers, + taskQueue: make(chan func(), 1), + workerQueue: make(chan func()), + stoppedChan: make(chan struct{}), + } + + // Start the task dispatcher. + go pool.dispatch() + + return pool +} + +// WorkerPool is a collection of goroutines, where the number of concurrent +// goroutines processing requests does not exceed the specified maximum. +type WorkerPool struct { + maxWorkers int + taskQueue chan func() + workerQueue chan func() + stoppedChan chan struct{} + waitingQueue deque.Deque + stopOnce sync.Once + stopped int32 + waiting int32 + wait bool +} + +// Size returns the maximum number of concurrent workers. +func (p *WorkerPool) Size() int { + return p.maxWorkers +} + +// Stop stops the worker pool and waits for only currently running tasks to +// complete. Pending tasks that are not currently running are abandoned. +// Tasks must not be submitted to the worker pool after calling stop. +// +// Since creating the worker pool starts at least one goroutine, for the +// dispatcher, Stop() or StopWait() should be called when the worker pool is no +// longer needed. +func (p *WorkerPool) Stop() { + p.stop(false) +} + +// StopWait stops the worker pool and waits for all queued tasks tasks to +// complete. No additional tasks may be submitted, but all pending tasks are +// executed by workers before this function returns. +func (p *WorkerPool) StopWait() { + p.stop(true) +} + +// Stopped returns true if this worker pool has been stopped. +func (p *WorkerPool) Stopped() bool { + return atomic.LoadInt32(&p.stopped) != 0 +} + +// Submit enqueues a function for a worker to execute. +// +// Any external values needed by the task function must be captured in a +// closure. Any return values should be returned over a channel that is +// captured in the task function closure. +// +// Submit will not block regardless of the number of tasks submitted. Each +// task is immediately given to an available worker or to a newly started +// worker. If there are no available workers, and the maximum number of +// workers are already created, then the task is put onto a waiting queue. +// +// When there are tasks on the waiting queue, any additional new tasks are put +// on the waiting queue. Tasks are removed from the waiting queue as workers +// become available. +// +// As long as no new tasks arrive, one available worker is shutdown each time +// period until there are no more idle workers. Since the time to start new +// goroutines is not significant, there is no need to retain idle workers +// indefinitely. +func (p *WorkerPool) Submit(task func()) { + if task != nil { + p.taskQueue <- task + } +} + +// SubmitWait enqueues the given function and waits for it to be executed. +func (p *WorkerPool) SubmitWait(task func()) { + if task == nil { + return + } + doneChan := make(chan struct{}) + p.taskQueue <- func() { + task() + close(doneChan) + } + <-doneChan +} + +// WaitingQueueSize returns the count of tasks in the waiting queue. +func (p *WorkerPool) WaitingQueueSize() int { + return int(atomic.LoadInt32(&p.waiting)) +} + +// dispatch sends the next queued task to an available worker. +func (p *WorkerPool) dispatch() { + defer close(p.stoppedChan) + timeout := time.NewTimer(idleTimeout) + var workerCount int + var idle bool + +Loop: + for { + // As long as tasks are in the waiting queue, incoming tasks are put + // into the waiting queue and tasks to run are taken from the waiting + // queue. Once the waiting queue is empty, then go back to submitting + // incoming tasks directly to available workers. + if p.waitingQueue.Len() != 0 { + if !p.processWaitingQueue() { + break Loop + } + continue + } + + select { + case task, ok := <-p.taskQueue: + if !ok { + break Loop + } + // Got a task to do. + select { + case p.workerQueue <- task: + default: + // Create a new worker, if not at max. + if workerCount < p.maxWorkers { + go startWorker(task, p.workerQueue) + workerCount++ + } else { + // Enqueue task to be executed by next available worker. + p.waitingQueue.PushBack(task) + atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len())) + } + } + idle = false + case <-timeout.C: + // Timed out waiting for work to arrive. Kill a ready worker if + // pool has been idle for a whole timeout. + if idle && workerCount > 0 { + if p.killIdleWorker() { + workerCount-- + } + } + idle = true + timeout.Reset(idleTimeout) + } + } + + // If instructed to wait, then run tasks that are already queued. + if p.wait { + p.runQueuedTasks() + } + + // Stop all remaining workers as they become ready. + for workerCount > 0 { + p.workerQueue <- nil + workerCount-- + } + + timeout.Stop() +} + +// startWorker runs initial task, then starts a worker waiting for more. +func startWorker(task func(), workerQueue chan func()) { + task() + go worker(workerQueue) +} + +// worker executes tasks and stops when it receives a nil task. +func worker(workerQueue chan func()) { + for task := range workerQueue { + if task == nil { + return + } + task() + } +} + +// stop tells the dispatcher to exit, and whether or not to complete queued +// tasks. +func (p *WorkerPool) stop(wait bool) { + p.stopOnce.Do(func() { + atomic.StoreInt32(&p.stopped, 1) + p.wait = wait + // Close task queue and wait for currently running tasks to finish. + close(p.taskQueue) + }) + <-p.stoppedChan +} + +// processWaitingQueue puts new tasks onto the the waiting queue, and removes +// tasks from the waiting queue as workers become available. Returns false if +// worker pool is stopped. +func (p *WorkerPool) processWaitingQueue() bool { + select { + case task, ok := <-p.taskQueue: + if !ok { + return false + } + p.waitingQueue.PushBack(task) + case p.workerQueue <- p.waitingQueue.Front().(func()): + // A worker was ready, so gave task to worker. + p.waitingQueue.PopFront() + } + atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len())) + return true +} + +func (p *WorkerPool) killIdleWorker() bool { + select { + case p.workerQueue <- nil: + // Sent kill signal to worker. + return true + default: + // No ready workers. All, if any, workers are busy. + return false + } +} + +// runQueuedTasks removes each task from the waiting queue and gives it to +// workers until queue is empty. +func (p *WorkerPool) runQueuedTasks() { + for p.waitingQueue.Len() != 0 { + // A worker is ready, so give task to worker. + p.workerQueue <- p.waitingQueue.PopFront().(func()) + atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len())) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c0fab447b..ab543b102 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -21,6 +21,10 @@ github.com/creachadair/jrpc2/server github.com/fatih/color # github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify +# github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 +github.com/gammazero/deque +# github.com/gammazero/workerpool v1.0.0 +github.com/gammazero/workerpool # github.com/google/go-cmp v0.4.1 github.com/google/go-cmp/cmp github.com/google/go-cmp/cmp/cmpopts