Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have a configurable maximum active work per peer #10

Merged
merged 2 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
maxOutstandingWorkPerPeer int
}

// Option is a function that configures the peer task queue
Expand Down Expand Up @@ -62,6 +63,16 @@ func TaskMerger(tmfp peertracker.TaskMerger) Option {
}
}

// MaxOutstandingWorkPerPeer is an option that specifies how many tasks a peer can have outstanding
// with the same Topic as an existing Topic.
func MaxOutstandingWorkPerPeer(count int) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.maxOutstandingWorkPerPeer
ptq.maxOutstandingWorkPerPeer = count
return MaxOutstandingWorkPerPeer(previous)
}
}

func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
Expand Down Expand Up @@ -139,7 +150,7 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {

peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to, ptq.taskMerger)
peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
Expand Down
25 changes: 19 additions & 6 deletions peertracker/peertracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type PeerTracker struct {
activelk sync.Mutex
activeWork int

maxActiveWorkPerPeer int

// for the PQ interface
index int

Expand All @@ -57,13 +59,14 @@ type PeerTracker struct {
}

// New creates a new PeerTracker
func New(target peer.ID, taskMerger TaskMerger) *PeerTracker {
func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int) *PeerTracker {
return &PeerTracker{
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
maxActiveWorkPerPeer: maxActiveWorkPerPeer,
}
}

Expand Down Expand Up @@ -172,6 +175,16 @@ func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) {
var out []*peertask.Task
work := 0
for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork {
if p.maxActiveWorkPerPeer > 0 {
// Do not add work to a peer that is already maxed out
p.activelk.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I think we need to revisit this lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But not now.

activeWork := p.activeWork
p.activelk.Unlock()
if activeWork >= p.maxActiveWorkPerPeer {
break
}
}

// Pop the next task off the queue
t := p.taskQueue.Pop().(*peertask.QueueTask)

Expand Down
32 changes: 17 additions & 15 deletions peertracker/peertracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"github.com/ipfs/go-peertaskqueue/testutil"
)

const testMaxActiveWorkPerPeer = 100

func TestEmpty(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks, _ := tracker.PopTasks(100)
if len(tasks) != 0 {
Expand All @@ -19,7 +21,7 @@ func TestEmpty(t *testing.T) {

func TestPushPop(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand All @@ -40,7 +42,7 @@ func TestPushPop(t *testing.T) {

func TestPopNegativeOrZeroSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand All @@ -62,7 +64,7 @@ func TestPopNegativeOrZeroSize(t *testing.T) {

func TestPushPopSizeAndOrder(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -116,7 +118,7 @@ func TestPushPopSizeAndOrder(t *testing.T) {

func TestPopFirstItemAlways(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -147,7 +149,7 @@ func TestPopFirstItemAlways(t *testing.T) {

func TestPopItemsToCoverTargetWork(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -183,7 +185,7 @@ func TestPopItemsToCoverTargetWork(t *testing.T) {

func TestRemove(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -215,7 +217,7 @@ func TestRemove(t *testing.T) {

func TestRemoveMulti(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -247,7 +249,7 @@ func TestRemoveMulti(t *testing.T) {

func TestTaskDone(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -299,7 +301,7 @@ func (*permissiveTaskMerger) Merge(task peertask.Task, existing *peertask.Task)

func TestReplaceTaskPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -338,7 +340,7 @@ func TestReplaceTaskPermissive(t *testing.T) {

func TestReplaceTaskSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -391,7 +393,7 @@ func TestReplaceTaskSize(t *testing.T) {

func TestReplaceActiveTask(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -430,7 +432,7 @@ func TestReplaceActiveTask(t *testing.T) {

func TestReplaceActiveTaskNonPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -468,7 +470,7 @@ func TestReplaceActiveTaskNonPermissive(t *testing.T) {

func TestReplaceTaskThatIsActiveAndPending(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -519,7 +521,7 @@ func TestReplaceTaskThatIsActiveAndPending(t *testing.T) {

func TestRemoveActive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down