Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
48bc817
Initial commit
vito Jan 7, 2015
bd92699
init
Jan 7, 2015
508e9da
queue is now a sliding queue
Jan 9, 2015
d5c9f46
Remove when POPping off the Queue
atulkc Jan 13, 2015
295074b
Change Ω to Expect [#93246926]
atulkc Apr 28, 2015
5a144f9
Move operationq to code.cloudfoundry.org
nimakaviani Jun 30, 2016
5bd284f
Add note about code.cloudfoundry.org to README
luan Jun 30, 2016
f612cf6
Fix flaky test
jfmyers9 Jun 30, 2016
4f9dafb
add NOTICE
jfmyers9 Jul 5, 2016
f80c619
Enforce canonical import path
caod123 Feb 6, 2017
7de3db6
Update LICENSE and NOTICE
emalm Feb 23, 2017
aaa8e99
Update README with URL to diego-release issues
mariash Jan 21, 2020
da3eb22
Update PR template
mariash Jan 25, 2020
8a7157d
content change
heyjcollins Jan 30, 2020
92b5466
Update PULL_REQUEST_TEMPLATE.md
heyjcollins Jan 31, 2020
95799f8
Ginkgo v2: don't use Done channels
ebroberson Aug 4, 2022
78a6732
Fix the test for max capacity
reneighbor Mar 24, 2023
4522d08
Bump to ginkgo/v2 (#2)
winkingturtle-vmw Apr 6, 2023
b625ea3
Add CODEOWNERS file in preparation for branch protection rules (#3)
geofffranks Jun 12, 2023
a959afa
Sync .github dir templates
tas-runtime-bot Apr 8, 2024
bf08b68
Sync README.md
tas-runtime-bot Oct 1, 2024
c91f584
Sync README.md
tas-runtime-bot Oct 25, 2024
728db98
Sync README.md
tas-runtime-bot Oct 26, 2024
ce3c8a0
Sync README.md
tas-runtime-bot Oct 29, 2024
8c51688
Sync README.md
tas-runtime-bot Dec 10, 2024
95c1da6
remove operationq submodule entry
kart2bc Aug 26, 2025
cdbf09c
Add 'src/code.cloudfoundry.org/operationq/' from commit '8c516880116a…
kart2bc Aug 26, 2025
c4326ff
Inline submodule operationq into main repo
kart2bc Aug 26, 2025
e302717
Merge branch 'develop' into operationq-inline-mod
ameowlia Sep 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
path = src/code.cloudfoundry.org/buildpackapplifecycle
url = https://github.com/cloudfoundry/buildpackapplifecycle
branch = main
[submodule "src/code.cloudfoundry.org/operationq"]
path = src/code.cloudfoundry.org/operationq
url = https://github.com/cloudfoundry/operationq
branch = main
[submodule "src/code.cloudfoundry.org/localdriver"]
path = src/code.cloudfoundry.org/localdriver
url = https://github.com/cloudfoundry/localdriver
Expand Down
1 change: 0 additions & 1 deletion src/code.cloudfoundry.org/operationq
Submodule operationq deleted from 8c5168
24 changes: 24 additions & 0 deletions src/code.cloudfoundry.org/operationq/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so

# Folders
_obj
_test

# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out

*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*

_testmain.go

*.exe
*.test
*.prof
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// This file was generated by counterfeiter
package fake_operationq

import (
"sync"

"code.cloudfoundry.org/operationq"
)

type FakeOperation struct {
KeyStub func() string
keyMutex sync.RWMutex
keyArgsForCall []struct{}
keyReturns struct {
result1 string
}
ExecuteStub func()
executeMutex sync.RWMutex
executeArgsForCall []struct{}
}

func (fake *FakeOperation) Key() string {
fake.keyMutex.Lock()
fake.keyArgsForCall = append(fake.keyArgsForCall, struct{}{})
fake.keyMutex.Unlock()
if fake.KeyStub != nil {
return fake.KeyStub()
} else {
return fake.keyReturns.result1
}
}

func (fake *FakeOperation) KeyCallCount() int {
fake.keyMutex.RLock()
defer fake.keyMutex.RUnlock()
return len(fake.keyArgsForCall)
}

func (fake *FakeOperation) KeyReturns(result1 string) {
fake.KeyStub = nil
fake.keyReturns = struct {
result1 string
}{result1}
}

func (fake *FakeOperation) Execute() {
fake.executeMutex.Lock()
fake.executeArgsForCall = append(fake.executeArgsForCall, struct{}{})
fake.executeMutex.Unlock()
if fake.ExecuteStub != nil {
fake.ExecuteStub()
}
}

func (fake *FakeOperation) ExecuteCallCount() int {
fake.executeMutex.RLock()
defer fake.executeMutex.RUnlock()
return len(fake.executeArgsForCall)
}

var _ operationq.Operation = new(FakeOperation)
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// This file was generated by counterfeiter
package fake_operationq

import (
"sync"

"code.cloudfoundry.org/operationq"
)

type FakeQueue struct {
PushStub func(operationq.Operation)
pushMutex sync.RWMutex
pushArgsForCall []struct {
arg1 operationq.Operation
}
CloseStub func()
closeMutex sync.RWMutex
closeArgsForCall []struct{}
WaitStub func()
waitMutex sync.RWMutex
waitArgsForCall []struct{}
}

func (fake *FakeQueue) Push(arg1 operationq.Operation) {
fake.pushMutex.Lock()
fake.pushArgsForCall = append(fake.pushArgsForCall, struct {
arg1 operationq.Operation
}{arg1})
fake.pushMutex.Unlock()
if fake.PushStub != nil {
fake.PushStub(arg1)
}
}

func (fake *FakeQueue) PushCallCount() int {
fake.pushMutex.RLock()
defer fake.pushMutex.RUnlock()
return len(fake.pushArgsForCall)
}

func (fake *FakeQueue) PushArgsForCall(i int) operationq.Operation {
fake.pushMutex.RLock()
defer fake.pushMutex.RUnlock()
return fake.pushArgsForCall[i].arg1
}

func (fake *FakeQueue) Close() {
fake.closeMutex.Lock()
fake.closeArgsForCall = append(fake.closeArgsForCall, struct{}{})
fake.closeMutex.Unlock()
if fake.CloseStub != nil {
fake.CloseStub()
}
}

func (fake *FakeQueue) CloseCallCount() int {
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
return len(fake.closeArgsForCall)
}

func (fake *FakeQueue) Wait() {
fake.waitMutex.Lock()
fake.waitArgsForCall = append(fake.waitArgsForCall, struct{}{})
fake.waitMutex.Unlock()
if fake.WaitStub != nil {
fake.WaitStub()
}
}

func (fake *FakeQueue) WaitCallCount() int {
fake.waitMutex.RLock()
defer fake.waitMutex.RUnlock()
return len(fake.waitArgsForCall)
}

var _ operationq.Queue = new(FakeQueue)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package fake_operationq // import "code.cloudfoundry.org/operationq/fake_operationq"
13 changes: 13 additions & 0 deletions src/code.cloudfoundry.org/operationq/operationq_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package operationq_test

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"testing"
)

func TestOperationq(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Operationq Suite")
}
1 change: 1 addition & 0 deletions src/code.cloudfoundry.org/operationq/package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package operationq // import "code.cloudfoundry.org/operationq"
125 changes: 125 additions & 0 deletions src/code.cloudfoundry.org/operationq/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// a parallel operation queue.
package operationq

import (
"container/list"
)

//go:generate counterfeiter -o fake_operationq/fake_operation.go . Operation

// The Operation interface is implemented externally, by the user of the queue.
type Operation interface {
// Identifier for the operation's queue. Operations with the same key will be
// executed in the order in which they were pushed. Operations with different
// keys will be executed concurrently.
Key() string

// Work to execute when the operation is popped off of the queue.
Execute()
}

//go:generate counterfeiter -o fake_operationq/fake_queue.go . Queue

// Queue executes operations, parallelized by operation key.
type Queue interface {
// Enqueue an operation for execution.
Push(Operation)
}

type buffer interface {
Push(Operation) bool
Pop() (Operation, bool)
Len() int
}

type slidingBuffer struct {
buffer *list.List
capacity int
}

func newSlidingBuffer(capacity int) buffer {
return &slidingBuffer{
buffer: list.New(),
capacity: capacity,
}
}

func (b *slidingBuffer) Push(op Operation) bool {
if b.capacity == 0 {
return false
}

b.buffer.PushBack(op)
if b.buffer.Len() > b.capacity {
b.buffer.Remove(b.buffer.Front())
}
return true
}

func (b *slidingBuffer) Pop() (Operation, bool) {
elem := b.buffer.Front()
if elem == nil {
return nil, false
}

b.buffer.Remove(elem)
return elem.Value.(Operation), true
}

func (b *slidingBuffer) Len() int {
return b.buffer.Len()
}

type multiQueue struct {
queues map[string]buffer
pushChan chan Operation
completeChan chan string
capacity int
}

// NewSlidingQueue returns a queue that will buffer up to `capacity` operations
// per key. When capacity is exceeded, older operations are dequeued to make room.
func NewSlidingQueue(capacity int) Queue {
q := &multiQueue{
queues: make(map[string]buffer),
pushChan: make(chan Operation),
completeChan: make(chan string),
capacity: capacity,
}
go q.run()
return q
}

func (q *multiQueue) run() {
for {
select {
case queueKey := <-q.completeChan:
queue := q.queues[queueKey]
if queue.Len() == 0 {
delete(q.queues, queueKey)
} else {
op, ok := queue.Pop()
if ok {
go q.execute(op)
}
}

case op := <-q.pushChan:
if queue, ok := q.queues[op.Key()]; ok {
queue.Push(op)
} else {
q.queues[op.Key()] = newSlidingBuffer(q.capacity)
go q.execute(op)
}
}
}
}

func (q *multiQueue) Push(o Operation) {
q.pushChan <- o
}

func (q *multiQueue) execute(o Operation) {
o.Execute()
q.completeChan <- o.Key()
}
Loading