From 1c1501dd60691d37d23d88f7ae8dc2b296b4b6fc Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 17 Jun 2024 20:35:31 -0400 Subject: [PATCH 01/18] Refactor event queue --- snow/engine/snowman/event/queue.go | 96 +++++++ snow/engine/snowman/event/queue_test.go | 316 ++++++++++++++++++++++++ snow/engine/snowman/issuer.go | 37 +-- snow/engine/snowman/transitive.go | 102 ++++---- snow/engine/snowman/transitive_test.go | 22 +- snow/engine/snowman/voter.go | 50 ++-- snow/event/blockable.go | 24 -- snow/event/blocker.go | 92 ------- snow/event/blocker_test.go | 116 --------- 9 files changed, 500 insertions(+), 355 deletions(-) create mode 100644 snow/engine/snowman/event/queue.go create mode 100644 snow/engine/snowman/event/queue_test.go delete mode 100644 snow/event/blockable.go delete mode 100644 snow/event/blocker.go delete mode 100644 snow/event/blocker_test.go diff --git a/snow/engine/snowman/event/queue.go b/snow/engine/snowman/event/queue.go new file mode 100644 index 00000000000..9f73b8cba8c --- /dev/null +++ b/snow/engine/snowman/event/queue.go @@ -0,0 +1,96 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package event + +import ( + "context" + + "github.com/ava-labs/avalanchego/utils/set" +) + +type Job interface { + Execute(context.Context) error + Cancel(context.Context) error +} + +type job[T comparable] struct { + dependencies set.Set[T] + job Job +} + +type Queue[T comparable] struct { + jobs map[T][]*job[T] +} + +func NewQueue[T comparable]() *Queue[T] { + return &Queue[T]{ + jobs: make(map[T][]*job[T]), + } +} + +func (q *Queue[T]) Register(ctx context.Context, userJob Job, dependencies ...T) error { + if len(dependencies) == 0 { + return userJob.Execute(ctx) + } + + j := &job[T]{ + dependencies: set.Of(dependencies...), + job: userJob, + } + for _, dependency := range dependencies { + q.jobs[dependency] = append(q.jobs[dependency], j) + } + return nil +} + +func (q *Queue[_]) Len() int { + return len(q.jobs) +} + +func (q *Queue[T]) Fulfill(ctx context.Context, dependency T) error { + jobs := q.jobs[dependency] + delete(q.jobs, dependency) + + for _, job := range jobs { + job.dependencies.Remove(dependency) + + userJob := job.job + if userJob == nil || job.dependencies.Len() != 0 { + continue + } + + // If the job was registered with duplicate dependencies, it may be + // possible for the job to be executed multiple times. To prevent this, + // we clear the job. + job.job = nil + + if err := userJob.Execute(ctx); err != nil { + return err + } + } + return nil +} + +func (q *Queue[T]) Abandon(ctx context.Context, dependency T) error { + jobs := q.jobs[dependency] + delete(q.jobs, dependency) + + for _, job := range jobs { + job.dependencies.Remove(dependency) + + userJob := job.job + if userJob == nil { + continue + } + + // Mark the job as cancelled so that any reentrant calls do not interact + // with this job again. + job.job = nil + + if err := userJob.Cancel(ctx); err != nil { + return err + } + } + return nil +} diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/event/queue_test.go new file mode 100644 index 00000000000..58df4ce9266 --- /dev/null +++ b/snow/engine/snowman/event/queue_test.go @@ -0,0 +1,316 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package event + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/utils/set" +) + +var ( + errDuplicateInvocation = errors.New("job already handled") + errUnexpectedInvocation = errors.New("job handled unexpectedly") +) + +type testJob struct { + execute func(context.Context) error + cancel func(context.Context) error +} + +func (j *testJob) Execute(ctx context.Context) error { + return j.execute(ctx) +} + +func (j *testJob) Cancel(ctx context.Context) error { + return j.cancel(ctx) +} + +func newQueueWithJob[T comparable](t *testing.T, job Job, dependencies ...T) *Queue[T] { + q := NewQueue[T]() + require.NoError(t, q.Register(context.Background(), job, dependencies...)) + return q +} + +func TestQueue_Fib(t *testing.T) { + var ( + require = require.New(t) + results []int + q = NewQueue[int]() + makeJob = func(n int) *testJob { + return &testJob{ + execute: func(ctx context.Context) error { + if n < 2 { + results = append(results, 1) + } else { + results = append(results, results[n-2]+results[n-1]) + } + return q.Fulfill(ctx, n) + }, + cancel: func(context.Context) error { return nil }, + } + } + ) + for i := 9; i >= 2; i-- { + require.NoError(q.Register(context.Background(), makeJob(i), i-2, i-1)) + } + require.Empty(results) + require.NoError(q.Register(context.Background(), makeJob(0))) + require.Equal([]int{1}, results) + require.NoError(q.Register(context.Background(), makeJob(1))) + require.Equal([]int{1, 1, 2, 3, 5, 8, 13, 21, 34, 55}, results) +} + +func TestQueue_Register(t *testing.T) { + var calledExecute bool + userJob := &testJob{ + execute: func(context.Context) error { + if calledExecute { + return errDuplicateInvocation + } + calledExecute = true + return nil + }, + cancel: func(context.Context) error { + return errUnexpectedInvocation + }, + } + + tests := []struct { + name string + queue *Queue[int] + dependencies []int + shouldExecute bool + expectedLen int + expectedQueue *Queue[int] + }{ + { + name: "no dependencies", + queue: NewQueue[int](), + dependencies: nil, + shouldExecute: true, + expectedLen: 0, + expectedQueue: NewQueue[int](), + }, + { + name: "one dependency", + queue: NewQueue[int](), + dependencies: []int{1}, + shouldExecute: false, + expectedLen: 1, + expectedQueue: &Queue[int]{ + jobs: map[int][]*job[int]{ + 1: { + { + dependencies: set.Of(1), + job: userJob, + }, + }, + }, + }, + }, + { + name: "two dependencies", + queue: NewQueue[int](), + dependencies: []int{1, 2}, + shouldExecute: false, + expectedLen: 2, + expectedQueue: &Queue[int]{ + jobs: map[int][]*job[int]{ + 1: { + { + dependencies: set.Of(1, 2), + job: userJob, + }, + }, + 2: { + { + dependencies: set.Of(1, 2), + job: userJob, + }, + }, + }, + }, + }, + { + name: "additional dependency", + queue: &Queue[int]{ + jobs: map[int][]*job[int]{ + 1: { + { + dependencies: set.Of(1), + job: userJob, + }, + }, + }, + }, + dependencies: []int{1}, + shouldExecute: false, + expectedLen: 1, + expectedQueue: &Queue[int]{ + jobs: map[int][]*job[int]{ + 1: { + { + dependencies: set.Of(1), + job: userJob, + }, + { + dependencies: set.Of(1), + job: userJob, + }, + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + // Reset the variable between tests + calledExecute = false + + require.NoError(test.queue.Register(context.Background(), userJob, test.dependencies...)) + require.Equal(test.expectedLen, test.queue.Len()) + require.Equal(test.shouldExecute, calledExecute) + require.Equal(test.expectedQueue, test.queue) + }) + } +} + +func TestQueue_Fulfill(t *testing.T) { + var calledExecute bool + userJob := &testJob{ + execute: func(context.Context) error { + if calledExecute { + return errDuplicateInvocation + } + calledExecute = true + return nil + }, + cancel: func(context.Context) error { + return errUnexpectedInvocation + }, + } + + tests := []struct { + name string + queue *Queue[int] + shouldExecute bool + expectedQueue *Queue[int] + }{ + { + name: "no jobs", + queue: NewQueue[int](), + shouldExecute: false, + expectedQueue: NewQueue[int](), + }, + { + name: "single dependency", + queue: newQueueWithJob(t, userJob, 1), + shouldExecute: true, + expectedQueue: NewQueue[int](), + }, + { + name: "non-existent dependency", + queue: newQueueWithJob(t, userJob, 2), + shouldExecute: false, + expectedQueue: newQueueWithJob(t, userJob, 2), + }, + { + name: "incomplete dependencies", + queue: newQueueWithJob(t, userJob, 1, 2), + shouldExecute: false, + expectedQueue: newQueueWithJob(t, userJob, 2), + }, + { + name: "duplicate dependency", + queue: newQueueWithJob(t, userJob, 1, 1), + shouldExecute: true, + expectedQueue: NewQueue[int](), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + // Reset the variable between tests + calledExecute = false + + require.NoError(test.queue.Fulfill(context.Background(), 1)) + require.Equal(test.shouldExecute, calledExecute) + require.Equal(test.expectedQueue, test.queue) + }) + } +} + +func TestQueue_Abandon(t *testing.T) { + var calledAbandon bool + userJob := &testJob{ + execute: func(context.Context) error { + return errUnexpectedInvocation + }, + cancel: func(context.Context) error { + if calledAbandon { + return errDuplicateInvocation + } + calledAbandon = true + return nil + }, + } + + tests := []struct { + name string + queue *Queue[int] + shouldCancel bool + expectedQueue *Queue[int] + }{ + { + name: "no jobs", + queue: NewQueue[int](), + shouldCancel: false, + expectedQueue: NewQueue[int](), + }, + { + name: "single dependency", + queue: newQueueWithJob(t, userJob, 1), + shouldCancel: true, + expectedQueue: NewQueue[int](), + }, + { + name: "non-existent dependency", + queue: newQueueWithJob(t, userJob, 2), + shouldCancel: false, + expectedQueue: newQueueWithJob(t, userJob, 2), + }, + { + name: "incomplete dependencies", + queue: newQueueWithJob(t, userJob, 1, 2), + shouldCancel: true, + expectedQueue: newQueueWithJob(t, nil, 2), + }, + { + name: "duplicate dependency", + queue: newQueueWithJob(t, userJob, 1, 1), + shouldCancel: true, + expectedQueue: NewQueue[int](), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + // Reset the variable between tests + calledAbandon = false + + require.NoError(test.queue.Abandon(context.Background(), 1)) + require.Equal(test.shouldCancel, calledAbandon) + require.Equal(test.expectedQueue, test.queue) + }) + } +} diff --git a/snow/engine/snowman/issuer.go b/snow/engine/snowman/issuer.go index b3677d3cc21..2ccaa99e510 100644 --- a/snow/engine/snowman/issuer.go +++ b/snow/engine/snowman/issuer.go @@ -10,7 +10,6 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/utils/set" ) // issuer issues [blk] into to consensus after its dependencies are met. @@ -18,37 +17,17 @@ type issuer struct { t *Transitive nodeID ids.NodeID // nodeID of the peer that provided this block blk snowman.Block - issuedMetric prometheus.Counter - abandoned bool - deps set.Set[ids.ID] push bool + issuedMetric prometheus.Counter } -func (i *issuer) Dependencies() set.Set[ids.ID] { - return i.deps -} - -// Mark that a dependency has been met -func (i *issuer) Fulfill(ctx context.Context, id ids.ID) { - i.deps.Remove(id) - i.Update(ctx) -} - -// Abandon the attempt to issue [i.block] -func (i *issuer) Abandon(ctx context.Context, _ ids.ID) { - if !i.abandoned { - blkID := i.blk.ID() - i.t.removeFromPending(i.blk) - i.t.addToNonVerifieds(i.blk) - i.t.blocked.Abandon(ctx, blkID) - } - i.abandoned = true +func (i *issuer) Execute(ctx context.Context) error { + return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric) } -func (i *issuer) Update(ctx context.Context) { - if i.abandoned || i.deps.Len() != 0 || i.t.errs.Errored() { - return - } - // Issue the block into consensus - i.t.errs.Add(i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric)) +func (i *issuer) Cancel(ctx context.Context) error { + blkID := i.blk.ID() + i.t.removeFromPending(i.blk) + i.t.addToNonVerifieds(i.blk) + return i.t.blocked.Abandon(ctx, blkID) } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 5164957f953..762e6763fde 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -21,7 +21,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/ancestor" - "github.com/ava-labs/avalanchego/snow/event" + "github.com/ava-labs/avalanchego/snow/engine/snowman/event" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/bag" "github.com/ava-labs/avalanchego/utils/bimap" @@ -30,7 +30,6 @@ import ( "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/units" - "github.com/ava-labs/avalanchego/utils/wrappers" ) const nonVerifiedCacheSize = 64 * units.MiB @@ -83,14 +82,11 @@ type Transitive struct { // operations that are blocked on a block being issued. This could be // issuing another block, responding to a query, or applying votes to consensus - blocked event.Blocker + blocked *event.Queue[ids.ID] // number of times build block needs to be called once the number of // processing blocks has gone below the optimal number. pendingBuildBlocks int - - // errs tracks if an error has occurred in a callback - errs wrappers.Errs } func New(config Config) (*Transitive, error) { @@ -147,6 +143,7 @@ func New(config Config) (*Transitive, error) { nonVerifieds: ancestor.NewTree(), nonVerifiedCache: nonVerifiedCache, acceptedFrontiers: acceptedFrontiers, + blocked: event.NewQueue[ids.ID](), polls: polls, blkReqs: bimap.New[common.Request, ids.ID](), blkReqSourceMetric: make(map[common.Request]prometheus.Counter), @@ -293,8 +290,11 @@ func (t *Transitive) GetFailed(ctx context.Context, nodeID ids.NodeID, requestID } delete(t.blkReqSourceMetric, req) - // Because the get request was dropped, we no longer expect blkID to be issued. - t.blocked.Abandon(ctx, blkID) + // Because the get request was dropped, we no longer expect blkID to be + // issued. + if err := t.blocked.Abandon(ctx, blkID); err != nil { + return err + } return t.executeDeferredWork(ctx) } @@ -391,21 +391,24 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin // issued into consensus v := &voter{ t: t, - vdr: nodeID, + nodeID: nodeID, requestID: requestID, responseOptions: responseOptions, } // Wait until [preferredID] and [preferredIDAtHeight] have been issued to // consensus before applying this chit. + var deps []ids.ID if !addedPreferred { - v.deps.Add(preferredID) + deps = append(deps, preferredID) } if !addedPreferredIDAtHeight { - v.deps.Add(preferredIDAtHeight) + deps = append(deps, preferredIDAtHeight) } - t.blocked.Register(ctx, v) + if err := t.blocked.Register(ctx, v, deps...); err != nil { + return err + } return t.executeDeferredWork(ctx) } @@ -415,14 +418,14 @@ func (t *Transitive) QueryFailed(ctx context.Context, nodeID ids.NodeID, request return t.Chits(ctx, nodeID, requestID, lastAccepted, lastAccepted, lastAccepted) } - t.blocked.Register( - ctx, - &voter{ - t: t, - vdr: nodeID, - requestID: requestID, - }, - ) + v := &voter{ + t: t, + nodeID: nodeID, + requestID: requestID, + } + if err := t.blocked.Register(ctx, v); err != nil { + return err + } return t.executeDeferredWork(ctx) } @@ -534,7 +537,6 @@ func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) { zap.Uint32("requestID", t.requestID), zap.Stringer("polls", t.polls), zap.Reflect("outstandingBlockRequests", t.blkReqs), - zap.Stringer("blockedJobs", &t.blocked), zap.Int("pendingBuildBlocks", t.pendingBuildBlocks), ) @@ -646,9 +648,6 @@ func (t *Transitive) sendChits(ctx context.Context, nodeID ids.NodeID, requestID // Build blocks if they have been requested and the number of processing blocks // is less than optimal. func (t *Transitive) buildBlocks(ctx context.Context) error { - if err := t.errs.Err; err != nil { - return err - } for t.pendingBuildBlocks > 0 && t.Consensus.NumProcessing() < t.Params.OptimalProcessing { t.pendingBuildBlocks-- @@ -759,13 +758,14 @@ func (t *Transitive) issueFrom( } issued := t.Consensus.Decided(blk) || t.Consensus.Processing(blkID) - if issued { - // A dependency should never be waiting on a decided or processing - // block. However, if the block was marked as rejected by the VM, the - // dependencies may still be waiting. Therefore, they should abandoned. - t.blocked.Abandon(ctx, blkID) + if !issued { + return false, nil } - return issued, t.errs.Err + + // A dependency should never be waiting on a decided or processing block. + // However, if the block was marked as rejected by the VM, the dependencies + // may still be waiting. Therefore, they should abandoned. + return true, t.blocked.Abandon(ctx, blkID) } // issueWithAncestors attempts to issue the branch ending with [blk] to consensus. @@ -806,8 +806,7 @@ func (t *Transitive) issueWithAncestors( // We don't have this block and have no reason to expect that we will get it. // Abandon the block to avoid a memory leak. - t.blocked.Abandon(ctx, blkID) - return false, t.errs.Err + return false, t.blocked.Abandon(ctx, blkID) } // If the block has been decided, then it is marked as having been issued. @@ -843,22 +842,24 @@ func (t *Transitive) issue( t: t, nodeID: nodeID, blk: blk, - issuedMetric: issuedMetric, push: push, + issuedMetric: issuedMetric, } // block on the parent if needed - parentID := blk.Parent() + var ( + parentID = blk.Parent() + deps []ids.ID + ) if parent, err := t.getBlock(ctx, parentID); err != nil || !(t.Consensus.Decided(parent) || t.Consensus.Processing(parentID)) { t.Ctx.Log.Verbo("block waiting for parent to be issued", zap.Stringer("blkID", blkID), zap.Stringer("parentID", parentID), ) - i.deps.Add(parentID) + deps = append(deps, parentID) } - t.blocked.Register(ctx, i) - return t.errs.Err + return t.blocked.Register(ctx, i, deps...) } // Request that [vdr] send us block [blkID] @@ -962,20 +963,18 @@ func (t *Transitive) deliver( // If [blk] is decided, then it shouldn't be added to consensus. // Similarly, if [blkID] is already in the processing set, it shouldn't // be added to consensus again. - t.blocked.Abandon(ctx, blkID) - return t.errs.Err + return t.blocked.Abandon(ctx, blkID) } parentID := blk.Parent() parent, err := t.getBlock(ctx, parentID) // Because the dependency must have been fulfilled by the time this function // is called - we don't expect [err] to be non-nil. But it is handled for - // completness and future proofing. + // completeness and future proofing. if err != nil || !(parent.Status() == choices.Accepted || t.Consensus.Processing(parentID)) { // if the parent isn't processing or the last accepted block, then this // block is effectively rejected - t.blocked.Abandon(ctx, blkID) - return t.errs.Err + return t.blocked.Abandon(ctx, blkID) } // By ensuring that the parent is either processing or accepted, it is @@ -986,8 +985,7 @@ func (t *Transitive) deliver( return err } if !blkAdded { - t.blocked.Abandon(ctx, blkID) - return t.errs.Err + return t.blocked.Abandon(ctx, blkID) } // Add all the oracle blocks if they exist. We call verify on all the blocks @@ -1026,7 +1024,9 @@ func (t *Transitive) deliver( t.sendQuery(ctx, blkID, blk.Bytes(), push) } - t.blocked.Fulfill(ctx, blkID) + if err := t.blocked.Fulfill(ctx, blkID); err != nil { + return err + } for _, blk := range added { blkID := blk.ID() if t.Consensus.IsPreferred(blkID) { @@ -1034,7 +1034,9 @@ func (t *Transitive) deliver( } t.removeFromPending(blk) - t.blocked.Fulfill(ctx, blkID) + if err := t.blocked.Fulfill(ctx, blkID); err != nil { + return err + } if req, ok := t.blkReqs.DeleteValue(blkID); ok { delete(t.blkReqSourceMetric, req) } @@ -1042,7 +1044,9 @@ func (t *Transitive) deliver( for _, blk := range dropped { blkID := blk.ID() t.removeFromPending(blk) - t.blocked.Abandon(ctx, blkID) + if err := t.blocked.Abandon(ctx, blkID); err != nil { + return err + } if req, ok := t.blkReqs.DeleteValue(blkID); ok { delete(t.blkReqSourceMetric, req) } @@ -1052,12 +1056,12 @@ func (t *Transitive) deliver( // immediately by votes that were pending their issuance. If this is the // case, we should not be requesting any chits. if t.Consensus.NumProcessing() == 0 { - return t.errs.Err + return nil } // If we should issue multiple queries at the same time, we need to repoll t.repoll(ctx) - return t.errs.Err + return nil } // Returns true if the block whose ID is [blkID] is waiting to be issued to consensus diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 8c912df64a4..92e9278459c 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -170,7 +170,7 @@ func TestEngineDropsAttemptToIssueBlockAfterFailedRequest(t *testing.T) { // job blocked on [parent]'s issuance. require.NoError(engine.Put(context.Background(), peerID, 0, child.Bytes())) require.NotNil(request) - require.Len(engine.blocked, 1) + require.Equal(1, engine.blocked.Len()) vm.ParseBlockF = func(context.Context, []byte) (snowman.Block, error) { return nil, errUnknownBytes @@ -179,7 +179,7 @@ func TestEngineDropsAttemptToIssueBlockAfterFailedRequest(t *testing.T) { // Because this request doesn't provide [parent], the [child] job should be // cancelled. require.NoError(engine.Put(context.Background(), request.NodeID, request.RequestID, nil)) - require.Empty(engine.blocked) + require.Zero(engine.blocked.Len()) } func TestEngineQuery(t *testing.T) { @@ -315,7 +315,7 @@ func TestEngineQuery(t *testing.T) { require.NoError(engine.Put(context.Background(), getRequest.NodeID, getRequest.RequestID, child.Bytes())) require.Equal(choices.Accepted, parent.Status()) require.Equal(choices.Accepted, child.Status()) - require.Empty(engine.blocked) + require.Zero(engine.blocked.Len()) } func TestEngineMultipleQuery(t *testing.T) { @@ -461,7 +461,7 @@ func TestEngineMultipleQuery(t *testing.T) { require.NoError(te.Chits(context.Background(), vdr2, *queryRequestID, blk0.ID(), blk0.ID(), blk0.ID())) require.Equal(choices.Accepted, blk1.Status()) - require.Empty(te.blocked) + require.Zero(te.blocked.Len()) } func TestEngineBlockedIssue(t *testing.T) { @@ -879,12 +879,12 @@ func TestEngineAbandonChit(t *testing.T) { // Register a voter dependency on an unknown block. require.NoError(te.Chits(context.Background(), vdr, reqID, fakeBlkID, fakeBlkID, fakeBlkID)) - require.Len(te.blocked, 1) + require.Equal(1, te.blocked.Len()) sender.CantSendPullQuery = false require.NoError(te.GetFailed(context.Background(), vdr, reqID)) - require.Empty(te.blocked) + require.Zero(te.blocked.Len()) } func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { @@ -932,7 +932,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { // Register a voter dependency on an unknown block. require.NoError(te.Chits(context.Background(), vdr, reqID, fakeBlkID, fakeBlkID, fakeBlkID)) - require.Len(te.blocked, 1) + require.Equal(1, te.blocked.Len()) sender.CantSendPullQuery = false @@ -944,7 +944,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { // Respond with an unexpected block and verify that the request is correctly // cleared. require.NoError(te.Put(context.Background(), vdr, reqID, snowmantest.GenesisBytes)) - require.Empty(te.blocked) + require.Zero(te.blocked.Len()) } func TestEngineBlockingChitRequest(t *testing.T) { @@ -988,7 +988,7 @@ func TestEngineBlockingChitRequest(t *testing.T) { require.NoError(te.PushQuery(context.Background(), vdr, 0, blockingBlk.Bytes(), 0)) - require.Len(te.blocked, 2) + require.Equal(2, te.blocked.Len()) sender.CantSendPullQuery = false @@ -1000,7 +1000,7 @@ func TestEngineBlockingChitRequest(t *testing.T) { te.metrics.issued.WithLabelValues(unknownSource), )) - require.Empty(te.blocked) + require.Zero(te.blocked.Len()) } func TestEngineBlockingChitResponse(t *testing.T) { @@ -1098,7 +1098,7 @@ func TestEngineBlockingChitResponse(t *testing.T) { missingBlk.ID(), blockingBlk.ID(), )) - require.Len(te.blocked, 2) + require.Equal(2, te.blocked.Len()) queryRequest = nil sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, blkID ids.ID, requestedHeight uint64) { diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index f987faf2aac..bf0b289d087 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -9,39 +9,21 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/snowman/event" "github.com/ava-labs/avalanchego/utils/bag" - "github.com/ava-labs/avalanchego/utils/set" ) +var _ event.Job = (*voter)(nil) + // Voter records chits received from [vdr] once its dependencies are met. type voter struct { t *Transitive - vdr ids.NodeID + nodeID ids.NodeID requestID uint32 responseOptions []ids.ID - deps set.Set[ids.ID] -} - -func (v *voter) Dependencies() set.Set[ids.ID] { - return v.deps -} - -// Mark that a dependency has been met. -func (v *voter) Fulfill(ctx context.Context, id ids.ID) { - v.deps.Remove(id) - v.Update(ctx) -} - -// Abandon this attempt to record chits. -func (v *voter) Abandon(ctx context.Context, id ids.ID) { - v.Fulfill(ctx, id) } -func (v *voter) Update(ctx context.Context) { - if v.deps.Len() != 0 || v.t.errs.Errored() { - return - } - +func (v *voter) Execute(ctx context.Context) error { var ( vote ids.ID shouldVote bool @@ -60,13 +42,13 @@ func (v *voter) Update(ctx context.Context) { var results []bag.Bag[ids.ID] if shouldVote { v.t.selectedVoteIndex.Observe(float64(voteIndex)) - results = v.t.polls.Vote(v.requestID, v.vdr, vote) + results = v.t.polls.Vote(v.requestID, v.nodeID, vote) } else { - results = v.t.polls.Drop(v.requestID, v.vdr) + results = v.t.polls.Drop(v.requestID, v.nodeID) } if len(results) == 0 { - return + return nil } for _, result := range results { @@ -75,24 +57,24 @@ func (v *voter) Update(ctx context.Context) { zap.Stringer("result", &result), ) if err := v.t.Consensus.RecordPoll(ctx, result); err != nil { - v.t.errs.Add(err) + return err } } - if v.t.errs.Errored() { - return - } - if err := v.t.VM.SetPreference(ctx, v.t.Consensus.Preference()); err != nil { - v.t.errs.Add(err) - return + return err } if v.t.Consensus.NumProcessing() == 0 { v.t.Ctx.Log.Debug("Snowman engine can quiesce") - return + return nil } v.t.Ctx.Log.Debug("Snowman engine can't quiesce") v.t.repoll(ctx) + return nil +} + +func (v *voter) Cancel(ctx context.Context) error { + return v.Execute(ctx) } diff --git a/snow/event/blockable.go b/snow/event/blockable.go deleted file mode 100644 index 404e95c2aee..00000000000 --- a/snow/event/blockable.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package event - -import ( - "context" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/set" -) - -// Blockable defines what an object must implement to be able to block on -// dependent events being completed. -type Blockable interface { - // IDs that this object is blocking on - Dependencies() set.Set[ids.ID] - // Notify this object that an event has been fulfilled - Fulfill(context.Context, ids.ID) - // Notify this object that an event has been abandoned - Abandon(context.Context, ids.ID) - // Update the state of this object without changing the status of any events - Update(context.Context) -} diff --git a/snow/event/blocker.go b/snow/event/blocker.go deleted file mode 100644 index 9c15ffb5060..00000000000 --- a/snow/event/blocker.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package event - -import ( - "context" - "fmt" - "strings" - - "github.com/ava-labs/avalanchego/ids" -) - -const ( - minBlockerSize = 16 -) - -// Blocker tracks Blockable events. -// Blocker is used to track events that require their dependencies to be -// fulfilled before them. Once a Blockable event is registered, it will be -// notified once any of its dependencies are fulfilled or abandoned. -type Blocker map[ids.ID][]Blockable - -func (b *Blocker) init() { - if *b == nil { - *b = make(map[ids.ID][]Blockable, minBlockerSize) - } -} - -// Returns the number of items that have dependencies waiting on -// them to be fulfilled -func (b *Blocker) Len() int { - return len(*b) -} - -// Fulfill notifies all objects blocking on the event whose ID is that -// the event has happened -func (b *Blocker) Fulfill(ctx context.Context, id ids.ID) { - b.init() - - blocking := (*b)[id] - delete(*b, id) - - for _, pending := range blocking { - pending.Fulfill(ctx, id) - } -} - -// Abandon notifies all objects blocking on the event whose ID is that -// the event has been abandoned -func (b *Blocker) Abandon(ctx context.Context, id ids.ID) { - b.init() - - blocking := (*b)[id] - delete(*b, id) - - for _, pending := range blocking { - pending.Abandon(ctx, id) - } -} - -// Register a new Blockable and its dependencies -func (b *Blocker) Register(ctx context.Context, pending Blockable) { - b.init() - - for pendingID := range pending.Dependencies() { - (*b)[pendingID] = append((*b)[pendingID], pending) - } - - pending.Update(ctx) -} - -// PrefixedString returns the same value as the String function, with all the -// new lines prefixed by [prefix] -func (b *Blocker) PrefixedString(prefix string) string { - b.init() - - sb := strings.Builder{} - sb.WriteString(fmt.Sprintf("Blocking on %d IDs:", len(*b))) - for key, value := range *b { - sb.WriteString(fmt.Sprintf("\n%sID[%s]: %d", - prefix, - key, - len(value), - )) - } - return strings.TrimSuffix(sb.String(), "\n") -} - -func (b *Blocker) String() string { - return b.PrefixedString("") -} diff --git a/snow/event/blocker_test.go b/snow/event/blocker_test.go deleted file mode 100644 index d7620bfebe1..00000000000 --- a/snow/event/blocker_test.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package event - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/set" -) - -func TestBlocker(t *testing.T) { - require := require.New(t) - - b := Blocker(nil) - - a := newTestBlockable() - - id0 := ids.GenerateTestID() - id1 := ids.GenerateTestID() - id2 := ids.GenerateTestID() - - calledDep := new(bool) - a.dependencies = func() set.Set[ids.ID] { - *calledDep = true - - s := set.Of(id0, id1) - return s - } - calledFill := new(bool) - a.fulfill = func(context.Context, ids.ID) { - *calledFill = true - } - calledAbandon := new(bool) - a.abandon = func(context.Context, ids.ID) { - *calledAbandon = true - } - calledUpdate := new(bool) - a.update = func(context.Context) { - *calledUpdate = true - } - - b.Register(context.Background(), a) - - require.True(*calledDep) - require.False(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Fulfill(context.Background(), id2) - b.Abandon(context.Background(), id2) - - require.True(*calledDep) - require.False(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Fulfill(context.Background(), id0) - - require.True(*calledDep) - require.True(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Abandon(context.Background(), id0) - - require.True(*calledDep) - require.True(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Abandon(context.Background(), id1) - - require.True(*calledDep) - require.True(*calledFill) - require.True(*calledAbandon) - require.True(*calledUpdate) -} - -type testBlockable struct { - dependencies func() set.Set[ids.ID] - fulfill func(context.Context, ids.ID) - abandon func(context.Context, ids.ID) - update func(context.Context) -} - -func newTestBlockable() *testBlockable { - return &testBlockable{ - dependencies: func() set.Set[ids.ID] { - return set.Set[ids.ID]{} - }, - fulfill: func(context.Context, ids.ID) {}, - abandon: func(context.Context, ids.ID) {}, - update: func(context.Context) {}, - } -} - -func (b *testBlockable) Dependencies() set.Set[ids.ID] { - return b.dependencies() -} - -func (b *testBlockable) Fulfill(ctx context.Context, id ids.ID) { - b.fulfill(ctx, id) -} - -func (b *testBlockable) Abandon(ctx context.Context, id ids.ID) { - b.abandon(ctx, id) -} - -func (b *testBlockable) Update(ctx context.Context) { - b.update(ctx) -} From d73dc9537bb6f7cb7a5703d43dae8c06f81be277 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 17 Jun 2024 20:35:58 -0400 Subject: [PATCH 02/18] remove fib test --- snow/engine/snowman/event/queue_test.go | 29 ------------------------- 1 file changed, 29 deletions(-) diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/event/queue_test.go index 58df4ce9266..efce6f589a3 100644 --- a/snow/engine/snowman/event/queue_test.go +++ b/snow/engine/snowman/event/queue_test.go @@ -37,35 +37,6 @@ func newQueueWithJob[T comparable](t *testing.T, job Job, dependencies ...T) *Qu return q } -func TestQueue_Fib(t *testing.T) { - var ( - require = require.New(t) - results []int - q = NewQueue[int]() - makeJob = func(n int) *testJob { - return &testJob{ - execute: func(ctx context.Context) error { - if n < 2 { - results = append(results, 1) - } else { - results = append(results, results[n-2]+results[n-1]) - } - return q.Fulfill(ctx, n) - }, - cancel: func(context.Context) error { return nil }, - } - } - ) - for i := 9; i >= 2; i-- { - require.NoError(q.Register(context.Background(), makeJob(i), i-2, i-1)) - } - require.Empty(results) - require.NoError(q.Register(context.Background(), makeJob(0))) - require.Equal([]int{1}, results) - require.NoError(q.Register(context.Background(), makeJob(1))) - require.Equal([]int{1, 1, 2, 3, 5, 8, 13, 21, 34, 55}, results) -} - func TestQueue_Register(t *testing.T) { var calledExecute bool userJob := &testJob{ From de7fba5b70c75d8f35f8e54b995cc8383ea4a9f1 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 17 Jun 2024 20:56:23 -0400 Subject: [PATCH 03/18] cleanup --- snow/engine/snowman/event/queue.go | 68 ++++++++++++++++--------- snow/engine/snowman/event/queue_test.go | 2 +- snow/engine/snowman/transitive.go | 2 +- snow/engine/snowman/transitive_test.go | 22 ++++---- 4 files changed, 56 insertions(+), 38 deletions(-) diff --git a/snow/engine/snowman/event/queue.go b/snow/engine/snowman/event/queue.go index 9f73b8cba8c..34818c317f4 100644 --- a/snow/engine/snowman/event/queue.go +++ b/snow/engine/snowman/event/queue.go @@ -5,6 +5,7 @@ package event import ( "context" + "math" "github.com/ava-labs/avalanchego/utils/set" ) @@ -15,11 +16,14 @@ type Job interface { } type job[T comparable] struct { + // If empty, the job is ready to be executed. dependencies set.Set[T] - job Job + // If nil, the job has already been executed or cancelled. + job Job } type Queue[T comparable] struct { + // jobs maps a dependency to the jobs that depend on it. jobs map[T][]*job[T] } @@ -29,6 +33,13 @@ func NewQueue[T comparable]() *Queue[T] { } } +// Register a job that should be executed once all of its dependencies are +// fulfilled. In order to prevent a memory leak, all dependencies must +// eventually either be fulfilled or abandoned. +// +// While registering a job with duplicate dependencies is discouraged, it is +// allowed and treated similarly to registering the job with the dependencies +// de-duplicated. func (q *Queue[T]) Register(ctx context.Context, userJob Job, dependencies ...T) error { if len(dependencies) == 0 { return userJob.Execute(ctx) @@ -44,43 +55,50 @@ func (q *Queue[T]) Register(ctx context.Context, userJob Job, dependencies ...T) return nil } -func (q *Queue[_]) Len() int { +// NumDependencies returns the number of dependencies that jobs are currently +// blocking on. +func (q *Queue[_]) NumDependencies() int { return len(q.jobs) } +// Fulfill a dependency. If all dependencies for a job are fulfilled, the job +// will be executed. +// +// It is safe to call the queue during the execution of a job. func (q *Queue[T]) Fulfill(ctx context.Context, dependency T) error { - jobs := q.jobs[dependency] - delete(q.jobs, dependency) - - for _, job := range jobs { - job.dependencies.Remove(dependency) - - userJob := job.job - if userJob == nil || job.dependencies.Len() != 0 { - continue - } - - // If the job was registered with duplicate dependencies, it may be - // possible for the job to be executed multiple times. To prevent this, - // we clear the job. - job.job = nil - - if err := userJob.Execute(ctx); err != nil { - return err - } - } - return nil + return q.resolveDependency(ctx, dependency, 0, Job.Execute) } +// Abandon a dependency. If any dependencies for a job are abandoned, the job +// will be cancelled. +// +// It is safe to call the queue during the cancelling of a job. func (q *Queue[T]) Abandon(ctx context.Context, dependency T) error { + return q.resolveDependency(ctx, dependency, math.MaxInt, Job.Cancel) +} + +// resolveDependency the provided dependency and execute the operation on all of +// the unexecuted jobs that have no more than [minDependencies]. +// +// For example, if [minDependencies] is 0, only jobs that have no more +// outstanding dependencies will be executed. If [minDependencies] is MaxInt, +// all jobs will be executed. +func (q *Queue[T]) resolveDependency( + ctx context.Context, + dependency T, + minDependencies int, + operation func(Job, context.Context) error, +) error { jobs := q.jobs[dependency] delete(q.jobs, dependency) for _, job := range jobs { + // Removing the dependency keeps the queue in a consistent state. + // However, it isn't strictly needed. job.dependencies.Remove(dependency) userJob := job.job - if userJob == nil { + if userJob == nil || job.dependencies.Len() > minDependencies { continue } @@ -88,7 +106,7 @@ func (q *Queue[T]) Abandon(ctx context.Context, dependency T) error { // with this job again. job.job = nil - if err := userJob.Cancel(ctx); err != nil { + if err := operation(userJob, ctx); err != nil { return err } } diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/event/queue_test.go index efce6f589a3..a5fbd451503 100644 --- a/snow/engine/snowman/event/queue_test.go +++ b/snow/engine/snowman/event/queue_test.go @@ -147,7 +147,7 @@ func TestQueue_Register(t *testing.T) { calledExecute = false require.NoError(test.queue.Register(context.Background(), userJob, test.dependencies...)) - require.Equal(test.expectedLen, test.queue.Len()) + require.Equal(test.expectedLen, test.queue.NumDependencies()) require.Equal(test.shouldExecute, calledExecute) require.Equal(test.expectedQueue, test.queue) }) diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 762e6763fde..a1d141ef3c2 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -562,7 +562,7 @@ func (t *Transitive) executeDeferredWork(ctx context.Context) error { t.metrics.numRequests.Set(float64(t.blkReqs.Len())) t.metrics.numBlocked.Set(float64(len(t.pending))) - t.metrics.numBlockers.Set(float64(t.blocked.Len())) + t.metrics.numBlockers.Set(float64(t.blocked.NumDependencies())) t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len())) return nil } diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 92e9278459c..f055c8758f1 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -170,7 +170,7 @@ func TestEngineDropsAttemptToIssueBlockAfterFailedRequest(t *testing.T) { // job blocked on [parent]'s issuance. require.NoError(engine.Put(context.Background(), peerID, 0, child.Bytes())) require.NotNil(request) - require.Equal(1, engine.blocked.Len()) + require.Equal(1, engine.blocked.NumDependencies()) vm.ParseBlockF = func(context.Context, []byte) (snowman.Block, error) { return nil, errUnknownBytes @@ -179,7 +179,7 @@ func TestEngineDropsAttemptToIssueBlockAfterFailedRequest(t *testing.T) { // Because this request doesn't provide [parent], the [child] job should be // cancelled. require.NoError(engine.Put(context.Background(), request.NodeID, request.RequestID, nil)) - require.Zero(engine.blocked.Len()) + require.Zero(engine.blocked.NumDependencies()) } func TestEngineQuery(t *testing.T) { @@ -315,7 +315,7 @@ func TestEngineQuery(t *testing.T) { require.NoError(engine.Put(context.Background(), getRequest.NodeID, getRequest.RequestID, child.Bytes())) require.Equal(choices.Accepted, parent.Status()) require.Equal(choices.Accepted, child.Status()) - require.Zero(engine.blocked.Len()) + require.Zero(engine.blocked.NumDependencies()) } func TestEngineMultipleQuery(t *testing.T) { @@ -461,7 +461,7 @@ func TestEngineMultipleQuery(t *testing.T) { require.NoError(te.Chits(context.Background(), vdr2, *queryRequestID, blk0.ID(), blk0.ID(), blk0.ID())) require.Equal(choices.Accepted, blk1.Status()) - require.Zero(te.blocked.Len()) + require.Zero(te.blocked.NumDependencies()) } func TestEngineBlockedIssue(t *testing.T) { @@ -879,12 +879,12 @@ func TestEngineAbandonChit(t *testing.T) { // Register a voter dependency on an unknown block. require.NoError(te.Chits(context.Background(), vdr, reqID, fakeBlkID, fakeBlkID, fakeBlkID)) - require.Equal(1, te.blocked.Len()) + require.Equal(1, te.blocked.NumDependencies()) sender.CantSendPullQuery = false require.NoError(te.GetFailed(context.Background(), vdr, reqID)) - require.Zero(te.blocked.Len()) + require.Zero(te.blocked.NumDependencies()) } func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { @@ -932,7 +932,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { // Register a voter dependency on an unknown block. require.NoError(te.Chits(context.Background(), vdr, reqID, fakeBlkID, fakeBlkID, fakeBlkID)) - require.Equal(1, te.blocked.Len()) + require.Equal(1, te.blocked.NumDependencies()) sender.CantSendPullQuery = false @@ -944,7 +944,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { // Respond with an unexpected block and verify that the request is correctly // cleared. require.NoError(te.Put(context.Background(), vdr, reqID, snowmantest.GenesisBytes)) - require.Zero(te.blocked.Len()) + require.Zero(te.blocked.NumDependencies()) } func TestEngineBlockingChitRequest(t *testing.T) { @@ -988,7 +988,7 @@ func TestEngineBlockingChitRequest(t *testing.T) { require.NoError(te.PushQuery(context.Background(), vdr, 0, blockingBlk.Bytes(), 0)) - require.Equal(2, te.blocked.Len()) + require.Equal(2, te.blocked.NumDependencies()) sender.CantSendPullQuery = false @@ -1000,7 +1000,7 @@ func TestEngineBlockingChitRequest(t *testing.T) { te.metrics.issued.WithLabelValues(unknownSource), )) - require.Zero(te.blocked.Len()) + require.Zero(te.blocked.NumDependencies()) } func TestEngineBlockingChitResponse(t *testing.T) { @@ -1098,7 +1098,7 @@ func TestEngineBlockingChitResponse(t *testing.T) { missingBlk.ID(), blockingBlk.ID(), )) - require.Equal(2, te.blocked.Len()) + require.Equal(2, te.blocked.NumDependencies()) queryRequest = nil sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, blkID ids.ID, requestedHeight uint64) { From 2d4472f77cc68680f249d98222af195c32527db8 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 17 Jun 2024 21:03:22 -0400 Subject: [PATCH 04/18] nit --- snow/engine/snowman/issuer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/snow/engine/snowman/issuer.go b/snow/engine/snowman/issuer.go index 2ccaa99e510..6ecb8b64c0b 100644 --- a/snow/engine/snowman/issuer.go +++ b/snow/engine/snowman/issuer.go @@ -10,8 +10,11 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/consensus/snowman" + "github.com/ava-labs/avalanchego/snow/engine/snowman/event" ) +var _ event.Job = (*issuer)(nil) + // issuer issues [blk] into to consensus after its dependencies are met. type issuer struct { t *Transitive From e99eee0b3d832aec7f33b21bae67a1281351902a Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 17 Jun 2024 21:33:10 -0400 Subject: [PATCH 05/18] change cancelling symantics --- snow/engine/snowman/event/queue.go | 30 +++++++++-------- snow/engine/snowman/event/queue_test.go | 45 ++++++++++++++----------- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/snow/engine/snowman/event/queue.go b/snow/engine/snowman/event/queue.go index 34818c317f4..86850d350a2 100644 --- a/snow/engine/snowman/event/queue.go +++ b/snow/engine/snowman/event/queue.go @@ -5,7 +5,6 @@ package event import ( "context" - "math" "github.com/ava-labs/avalanchego/utils/set" ) @@ -18,6 +17,8 @@ type Job interface { type job[T comparable] struct { // If empty, the job is ready to be executed. dependencies set.Set[T] + // If true, the job should be cancelled. + shouldCancel bool // If nil, the job has already been executed or cancelled. job Job } @@ -66,28 +67,22 @@ func (q *Queue[_]) NumDependencies() int { // // It is safe to call the queue during the execution of a job. func (q *Queue[T]) Fulfill(ctx context.Context, dependency T) error { - return q.resolveDependency(ctx, dependency, 0, Job.Execute) + return q.resolveDependency(ctx, dependency, false) } // Abandon a dependency. If any dependencies for a job are abandoned, the job -// will be cancelled. +// will be cancelled. The job will only be cancelled once all dependencies are +// resolved. // // It is safe to call the queue during the cancelling of a job. func (q *Queue[T]) Abandon(ctx context.Context, dependency T) error { - return q.resolveDependency(ctx, dependency, math.MaxInt, Job.Cancel) + return q.resolveDependency(ctx, dependency, true) } -// resolveDependency the provided dependency and execute the operation on all of -// the unexecuted jobs that have no more than [minDependencies]. -// -// For example, if [minDependencies] is 0, only jobs that have no more -// outstanding dependencies will be executed. If [minDependencies] is MaxInt, -// all jobs will be executed. func (q *Queue[T]) resolveDependency( ctx context.Context, dependency T, - minDependencies int, - operation func(Job, context.Context) error, + shouldCancel bool, ) error { jobs := q.jobs[dependency] delete(q.jobs, dependency) @@ -96,9 +91,10 @@ func (q *Queue[T]) resolveDependency( // Removing the dependency keeps the queue in a consistent state. // However, it isn't strictly needed. job.dependencies.Remove(dependency) + job.shouldCancel = shouldCancel || job.shouldCancel userJob := job.job - if userJob == nil || job.dependencies.Len() > minDependencies { + if userJob == nil || job.dependencies.Len() != 0 { continue } @@ -106,7 +102,13 @@ func (q *Queue[T]) resolveDependency( // with this job again. job.job = nil - if err := operation(userJob, ctx); err != nil { + var err error + if job.shouldCancel { + err = userJob.Cancel(ctx) + } else { + err = userJob.Execute(ctx) + } + if err != nil { return err } } diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/event/queue_test.go index a5fbd451503..9d8d09fc345 100644 --- a/snow/engine/snowman/event/queue_test.go +++ b/snow/engine/snowman/event/queue_test.go @@ -31,9 +31,16 @@ func (j *testJob) Cancel(ctx context.Context) error { return j.cancel(ctx) } -func newQueueWithJob[T comparable](t *testing.T, job Job, dependencies ...T) *Queue[T] { +func newQueueWithJob[T comparable](t *testing.T, job Job, shouldCancel bool, dependencies ...T) *Queue[T] { q := NewQueue[T]() require.NoError(t, q.Register(context.Background(), job, dependencies...)) + if shouldCancel { + for _, dependency := range dependencies { + for _, j := range q.jobs[dependency] { + j.shouldCancel = true + } + } + } return q } @@ -183,25 +190,25 @@ func TestQueue_Fulfill(t *testing.T) { }, { name: "single dependency", - queue: newQueueWithJob(t, userJob, 1), + queue: newQueueWithJob(t, userJob, false, 1), shouldExecute: true, expectedQueue: NewQueue[int](), }, { name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, 2), + queue: newQueueWithJob(t, userJob, false, 2), shouldExecute: false, - expectedQueue: newQueueWithJob(t, userJob, 2), + expectedQueue: newQueueWithJob(t, userJob, false, 2), }, { name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, 1, 2), + queue: newQueueWithJob(t, userJob, false, 1, 2), shouldExecute: false, - expectedQueue: newQueueWithJob(t, userJob, 2), + expectedQueue: newQueueWithJob(t, userJob, false, 2), }, { name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, 1, 1), + queue: newQueueWithJob(t, userJob, false, 1, 1), shouldExecute: true, expectedQueue: NewQueue[int](), }, @@ -221,16 +228,16 @@ func TestQueue_Fulfill(t *testing.T) { } func TestQueue_Abandon(t *testing.T) { - var calledAbandon bool + var calledCancel bool userJob := &testJob{ execute: func(context.Context) error { return errUnexpectedInvocation }, cancel: func(context.Context) error { - if calledAbandon { + if calledCancel { return errDuplicateInvocation } - calledAbandon = true + calledCancel = true return nil }, } @@ -249,25 +256,25 @@ func TestQueue_Abandon(t *testing.T) { }, { name: "single dependency", - queue: newQueueWithJob(t, userJob, 1), + queue: newQueueWithJob(t, userJob, false, 1), shouldCancel: true, expectedQueue: NewQueue[int](), }, { name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, 2), + queue: newQueueWithJob(t, userJob, false, 2), shouldCancel: false, - expectedQueue: newQueueWithJob(t, userJob, 2), + expectedQueue: newQueueWithJob(t, userJob, false, 2), }, { name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, 1, 2), - shouldCancel: true, - expectedQueue: newQueueWithJob(t, nil, 2), + queue: newQueueWithJob(t, userJob, false, 1, 2), + shouldCancel: false, + expectedQueue: newQueueWithJob(t, userJob, true, 2), }, { name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, 1, 1), + queue: newQueueWithJob(t, userJob, false, 1, 1), shouldCancel: true, expectedQueue: NewQueue[int](), }, @@ -277,10 +284,10 @@ func TestQueue_Abandon(t *testing.T) { require := require.New(t) // Reset the variable between tests - calledAbandon = false + calledCancel = false require.NoError(test.queue.Abandon(context.Background(), 1)) - require.Equal(test.shouldCancel, calledAbandon) + require.Equal(test.shouldCancel, calledCancel) require.Equal(test.expectedQueue, test.queue) }) } From 4391c662955fd2dc742c2c324c803f5dd10d9fb4 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 17 Jun 2024 21:35:23 -0400 Subject: [PATCH 06/18] nit --- snow/engine/snowman/event/queue_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/event/queue_test.go index 9d8d09fc345..2eea2e93378 100644 --- a/snow/engine/snowman/event/queue_test.go +++ b/snow/engine/snowman/event/queue_test.go @@ -35,8 +35,8 @@ func newQueueWithJob[T comparable](t *testing.T, job Job, shouldCancel bool, dep q := NewQueue[T]() require.NoError(t, q.Register(context.Background(), job, dependencies...)) if shouldCancel { - for _, dependency := range dependencies { - for _, j := range q.jobs[dependency] { + for _, jobs := range q.jobs { + for _, j := range jobs { j.shouldCancel = true } } From ba37754f7da7741183040ef6f15c15ea7f10a30e Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 11:51:03 -0400 Subject: [PATCH 07/18] nit --- snow/engine/snowman/transitive.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index a1d141ef3c2..7d03b65a9a0 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -757,8 +757,7 @@ func (t *Transitive) issueFrom( delete(t.blkReqSourceMetric, req) } - issued := t.Consensus.Decided(blk) || t.Consensus.Processing(blkID) - if !issued { + if !t.Consensus.Decided(blk) && !t.Consensus.Processing(blkID) { return false, nil } From da11c5ca3026607d19beb301682c406f7f200ec1 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 12:04:56 -0400 Subject: [PATCH 08/18] cleanup from review --- snow/engine/snowman/event/queue.go | 18 +-- snow/engine/snowman/event/queue_test.go | 198 ++++++++++++------------ snow/engine/snowman/voter.go | 2 +- 3 files changed, 107 insertions(+), 111 deletions(-) diff --git a/snow/engine/snowman/event/queue.go b/snow/engine/snowman/event/queue.go index 86850d350a2..c0bc3f52d93 100644 --- a/snow/engine/snowman/event/queue.go +++ b/snow/engine/snowman/event/queue.go @@ -24,13 +24,13 @@ type job[T comparable] struct { } type Queue[T comparable] struct { - // jobs maps a dependency to the jobs that depend on it. - jobs map[T][]*job[T] + // dependents maps a dependency to the jobs that depend on it. + dependents map[T][]*job[T] } func NewQueue[T comparable]() *Queue[T] { return &Queue[T]{ - jobs: make(map[T][]*job[T]), + dependents: make(map[T][]*job[T]), } } @@ -50,8 +50,8 @@ func (q *Queue[T]) Register(ctx context.Context, userJob Job, dependencies ...T) dependencies: set.Of(dependencies...), job: userJob, } - for _, dependency := range dependencies { - q.jobs[dependency] = append(q.jobs[dependency], j) + for _, d := range dependencies { + q.dependents[d] = append(q.dependents[d], j) } return nil } @@ -59,7 +59,7 @@ func (q *Queue[T]) Register(ctx context.Context, userJob Job, dependencies ...T) // NumDependencies returns the number of dependencies that jobs are currently // blocking on. func (q *Queue[_]) NumDependencies() int { - return len(q.jobs) + return len(q.dependents) } // Fulfill a dependency. If all dependencies for a job are fulfilled, the job @@ -84,8 +84,8 @@ func (q *Queue[T]) resolveDependency( dependency T, shouldCancel bool, ) error { - jobs := q.jobs[dependency] - delete(q.jobs, dependency) + jobs := q.dependents[dependency] + delete(q.dependents, dependency) for _, job := range jobs { // Removing the dependency keeps the queue in a consistent state. @@ -98,7 +98,7 @@ func (q *Queue[T]) resolveDependency( continue } - // Mark the job as cancelled so that any reentrant calls do not interact + // Mark the job as handled so that any reentrant calls do not interact // with this job again. job.job = nil diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/event/queue_test.go index 2eea2e93378..b6e6e00fca0 100644 --- a/snow/engine/snowman/event/queue_test.go +++ b/snow/engine/snowman/event/queue_test.go @@ -13,6 +13,11 @@ import ( "github.com/ava-labs/avalanchego/utils/set" ) +const ( + depToResolve = iota + depToNeglect +) + var ( errDuplicateInvocation = errors.New("job already handled") errUnexpectedInvocation = errors.New("job handled unexpectedly") @@ -35,7 +40,7 @@ func newQueueWithJob[T comparable](t *testing.T, job Job, shouldCancel bool, dep q := NewQueue[T]() require.NoError(t, q.Register(context.Background(), job, dependencies...)) if shouldCancel { - for _, jobs := range q.jobs { + for _, jobs := range q.dependents { for _, j := range jobs { j.shouldCancel = true } @@ -60,32 +65,32 @@ func TestQueue_Register(t *testing.T) { } tests := []struct { - name string - queue *Queue[int] - dependencies []int - shouldExecute bool - expectedLen int - expectedQueue *Queue[int] + name string + queue *Queue[int] + dependencies []int + wantExecuted bool + wantLen int + wantQueue *Queue[int] }{ { - name: "no dependencies", - queue: NewQueue[int](), - dependencies: nil, - shouldExecute: true, - expectedLen: 0, - expectedQueue: NewQueue[int](), + name: "no dependencies", + queue: NewQueue[int](), + dependencies: nil, + wantExecuted: true, + wantLen: 0, + wantQueue: NewQueue[int](), }, { - name: "one dependency", - queue: NewQueue[int](), - dependencies: []int{1}, - shouldExecute: false, - expectedLen: 1, - expectedQueue: &Queue[int]{ - jobs: map[int][]*job[int]{ - 1: { + name: "one dependency", + queue: NewQueue[int](), + dependencies: []int{depToResolve}, + wantExecuted: false, + wantLen: 1, + wantQueue: &Queue[int]{ + dependents: map[int][]*job[int]{ + depToResolve: { { - dependencies: set.Of(1), + dependencies: set.Of(depToResolve), job: userJob, }, }, @@ -93,22 +98,22 @@ func TestQueue_Register(t *testing.T) { }, }, { - name: "two dependencies", - queue: NewQueue[int](), - dependencies: []int{1, 2}, - shouldExecute: false, - expectedLen: 2, - expectedQueue: &Queue[int]{ - jobs: map[int][]*job[int]{ - 1: { + name: "two dependencies", + queue: NewQueue[int](), + dependencies: []int{depToResolve, depToNeglect}, + wantExecuted: false, + wantLen: 2, + wantQueue: &Queue[int]{ + dependents: map[int][]*job[int]{ + depToResolve: { { - dependencies: set.Of(1, 2), + dependencies: set.Of(depToResolve, depToNeglect), job: userJob, }, }, - 2: { + depToNeglect: { { - dependencies: set.Of(1, 2), + dependencies: set.Of(depToResolve, depToNeglect), job: userJob, }, }, @@ -116,29 +121,20 @@ func TestQueue_Register(t *testing.T) { }, }, { - name: "additional dependency", - queue: &Queue[int]{ - jobs: map[int][]*job[int]{ - 1: { + name: "additional dependency", + queue: newQueueWithJob(t, userJob, false, depToResolve), + dependencies: []int{depToResolve}, + wantExecuted: false, + wantLen: 1, + wantQueue: &Queue[int]{ + dependents: map[int][]*job[int]{ + depToResolve: { { - dependencies: set.Of(1), + dependencies: set.Of(depToResolve), job: userJob, }, - }, - }, - }, - dependencies: []int{1}, - shouldExecute: false, - expectedLen: 1, - expectedQueue: &Queue[int]{ - jobs: map[int][]*job[int]{ - 1: { { - dependencies: set.Of(1), - job: userJob, - }, - { - dependencies: set.Of(1), + dependencies: set.Of(depToResolve), job: userJob, }, }, @@ -154,9 +150,9 @@ func TestQueue_Register(t *testing.T) { calledExecute = false require.NoError(test.queue.Register(context.Background(), userJob, test.dependencies...)) - require.Equal(test.expectedLen, test.queue.NumDependencies()) - require.Equal(test.shouldExecute, calledExecute) - require.Equal(test.expectedQueue, test.queue) + require.Equal(test.wantLen, test.queue.NumDependencies()) + require.Equal(test.wantExecuted, calledExecute) + require.Equal(test.wantQueue, test.queue) }) } } @@ -177,40 +173,40 @@ func TestQueue_Fulfill(t *testing.T) { } tests := []struct { - name string - queue *Queue[int] - shouldExecute bool - expectedQueue *Queue[int] + name string + queue *Queue[int] + wantExecute bool + wantQueue *Queue[int] }{ { - name: "no jobs", - queue: NewQueue[int](), - shouldExecute: false, - expectedQueue: NewQueue[int](), + name: "no jobs", + queue: NewQueue[int](), + wantExecute: false, + wantQueue: NewQueue[int](), }, { - name: "single dependency", - queue: newQueueWithJob(t, userJob, false, 1), - shouldExecute: true, - expectedQueue: NewQueue[int](), + name: "single dependency", + queue: newQueueWithJob(t, userJob, false, depToResolve), + wantExecute: true, + wantQueue: NewQueue[int](), }, { - name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, false, 2), - shouldExecute: false, - expectedQueue: newQueueWithJob(t, userJob, false, 2), + name: "non-existent dependency", + queue: newQueueWithJob(t, userJob, false, depToNeglect), + wantExecute: false, + wantQueue: newQueueWithJob(t, userJob, false, depToNeglect), }, { - name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, false, 1, 2), - shouldExecute: false, - expectedQueue: newQueueWithJob(t, userJob, false, 2), + name: "incomplete dependencies", + queue: newQueueWithJob(t, userJob, false, depToResolve, depToNeglect), + wantExecute: false, + wantQueue: newQueueWithJob(t, userJob, false, depToNeglect), }, { - name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, false, 1, 1), - shouldExecute: true, - expectedQueue: NewQueue[int](), + name: "duplicate dependency", + queue: newQueueWithJob(t, userJob, false, depToResolve, depToResolve), + wantExecute: true, + wantQueue: NewQueue[int](), }, } for _, test := range tests { @@ -220,9 +216,9 @@ func TestQueue_Fulfill(t *testing.T) { // Reset the variable between tests calledExecute = false - require.NoError(test.queue.Fulfill(context.Background(), 1)) - require.Equal(test.shouldExecute, calledExecute) - require.Equal(test.expectedQueue, test.queue) + require.NoError(test.queue.Fulfill(context.Background(), depToResolve)) + require.Equal(test.wantExecute, calledExecute) + require.Equal(test.wantQueue, test.queue) }) } } @@ -245,38 +241,38 @@ func TestQueue_Abandon(t *testing.T) { tests := []struct { name string queue *Queue[int] - shouldCancel bool - expectedQueue *Queue[int] + wantCancelled bool + wantQueue *Queue[int] }{ { name: "no jobs", queue: NewQueue[int](), - shouldCancel: false, - expectedQueue: NewQueue[int](), + wantCancelled: false, + wantQueue: NewQueue[int](), }, { name: "single dependency", - queue: newQueueWithJob(t, userJob, false, 1), - shouldCancel: true, - expectedQueue: NewQueue[int](), + queue: newQueueWithJob(t, userJob, false, depToResolve), + wantCancelled: true, + wantQueue: NewQueue[int](), }, { name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, false, 2), - shouldCancel: false, - expectedQueue: newQueueWithJob(t, userJob, false, 2), + queue: newQueueWithJob(t, userJob, false, depToNeglect), + wantCancelled: false, + wantQueue: newQueueWithJob(t, userJob, false, depToNeglect), }, { name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, false, 1, 2), - shouldCancel: false, - expectedQueue: newQueueWithJob(t, userJob, true, 2), + queue: newQueueWithJob(t, userJob, false, depToResolve, depToNeglect), + wantCancelled: false, + wantQueue: newQueueWithJob(t, userJob, true, depToNeglect), }, { name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, false, 1, 1), - shouldCancel: true, - expectedQueue: NewQueue[int](), + queue: newQueueWithJob(t, userJob, false, depToResolve, depToResolve), + wantCancelled: true, + wantQueue: NewQueue[int](), }, } for _, test := range tests { @@ -286,9 +282,9 @@ func TestQueue_Abandon(t *testing.T) { // Reset the variable between tests calledCancel = false - require.NoError(test.queue.Abandon(context.Background(), 1)) - require.Equal(test.shouldCancel, calledCancel) - require.Equal(test.expectedQueue, test.queue) + require.NoError(test.queue.Abandon(context.Background(), depToResolve)) + require.Equal(test.wantCancelled, calledCancel) + require.Equal(test.wantQueue, test.queue) }) } } diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index bf0b289d087..bda4e447e9f 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -15,7 +15,7 @@ import ( var _ event.Job = (*voter)(nil) -// Voter records chits received from [vdr] once its dependencies are met. +// Voter records chits received from [nodeID] once its dependencies are met. type voter struct { t *Transitive nodeID ids.NodeID From 6ec2445f0363d39407a67c9a246d417a8461fc4a Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 12:30:41 -0400 Subject: [PATCH 09/18] rename everything --- snow/engine/snowman/issuer.go | 4 +- .../{event/queue.go => job/scheduler.go} | 37 ++-- .../queue_test.go => job/scheduler_test.go} | 163 +++++++++--------- snow/engine/snowman/transitive.go | 6 +- snow/engine/snowman/voter.go | 4 +- 5 files changed, 113 insertions(+), 101 deletions(-) rename snow/engine/snowman/{event/queue.go => job/scheduler.go} (68%) rename snow/engine/snowman/{event/queue_test.go => job/scheduler_test.go} (50%) diff --git a/snow/engine/snowman/issuer.go b/snow/engine/snowman/issuer.go index 6ecb8b64c0b..efb23a90b5e 100644 --- a/snow/engine/snowman/issuer.go +++ b/snow/engine/snowman/issuer.go @@ -10,10 +10,10 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/snow/engine/snowman/event" + "github.com/ava-labs/avalanchego/snow/engine/snowman/job" ) -var _ event.Job = (*issuer)(nil) +var _ job.Job = (*issuer)(nil) // issuer issues [blk] into to consensus after its dependencies are met. type issuer struct { diff --git a/snow/engine/snowman/event/queue.go b/snow/engine/snowman/job/scheduler.go similarity index 68% rename from snow/engine/snowman/event/queue.go rename to snow/engine/snowman/job/scheduler.go index c0bc3f52d93..1d62f0fac93 100644 --- a/snow/engine/snowman/event/queue.go +++ b/snow/engine/snowman/job/scheduler.go @@ -1,7 +1,9 @@ // Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -package event +// Package job provides a Scheduler to manage and execute Jobs with +// dependencies. +package job import ( "context" @@ -9,6 +11,7 @@ import ( "github.com/ava-labs/avalanchego/utils/set" ) +// Job is a unit of work that can be executed or cancelled. type Job interface { Execute(context.Context) error Cancel(context.Context) error @@ -23,13 +26,17 @@ type job[T comparable] struct { job Job } -type Queue[T comparable] struct { +// Scheduler implements a dependency graph for jobs. Jobs can be registered with +// dependencies, and once all dependencies are fulfilled, the job will be +// executed. If any of the dependencies are abandoned, the job will be +// cancelled. +type Scheduler[T comparable] struct { // dependents maps a dependency to the jobs that depend on it. dependents map[T][]*job[T] } -func NewQueue[T comparable]() *Queue[T] { - return &Queue[T]{ +func NewScheduler[T comparable]() *Scheduler[T] { + return &Scheduler[T]{ dependents: make(map[T][]*job[T]), } } @@ -41,7 +48,7 @@ func NewQueue[T comparable]() *Queue[T] { // While registering a job with duplicate dependencies is discouraged, it is // allowed and treated similarly to registering the job with the dependencies // de-duplicated. -func (q *Queue[T]) Register(ctx context.Context, userJob Job, dependencies ...T) error { +func (s *Scheduler[T]) Register(ctx context.Context, userJob Job, dependencies ...T) error { if len(dependencies) == 0 { return userJob.Execute(ctx) } @@ -51,23 +58,23 @@ func (q *Queue[T]) Register(ctx context.Context, userJob Job, dependencies ...T) job: userJob, } for _, d := range dependencies { - q.dependents[d] = append(q.dependents[d], j) + s.dependents[d] = append(s.dependents[d], j) } return nil } // NumDependencies returns the number of dependencies that jobs are currently // blocking on. -func (q *Queue[_]) NumDependencies() int { - return len(q.dependents) +func (s *Scheduler[_]) NumDependencies() int { + return len(s.dependents) } // Fulfill a dependency. If all dependencies for a job are fulfilled, the job // will be executed. // // It is safe to call the queue during the execution of a job. -func (q *Queue[T]) Fulfill(ctx context.Context, dependency T) error { - return q.resolveDependency(ctx, dependency, false) +func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error { + return s.resolveDependency(ctx, dependency, false) } // Abandon a dependency. If any dependencies for a job are abandoned, the job @@ -75,17 +82,17 @@ func (q *Queue[T]) Fulfill(ctx context.Context, dependency T) error { // resolved. // // It is safe to call the queue during the cancelling of a job. -func (q *Queue[T]) Abandon(ctx context.Context, dependency T) error { - return q.resolveDependency(ctx, dependency, true) +func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error { + return s.resolveDependency(ctx, dependency, true) } -func (q *Queue[T]) resolveDependency( +func (s *Scheduler[T]) resolveDependency( ctx context.Context, dependency T, shouldCancel bool, ) error { - jobs := q.dependents[dependency] - delete(q.dependents, dependency) + jobs := s.dependents[dependency] + delete(s.dependents, dependency) for _, job := range jobs { // Removing the dependency keeps the queue in a consistent state. diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/job/scheduler_test.go similarity index 50% rename from snow/engine/snowman/event/queue_test.go rename to snow/engine/snowman/job/scheduler_test.go index b6e6e00fca0..95c6315184c 100644 --- a/snow/engine/snowman/event/queue_test.go +++ b/snow/engine/snowman/job/scheduler_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -package event +package job import ( "context" @@ -36,8 +36,13 @@ func (j *testJob) Cancel(ctx context.Context) error { return j.cancel(ctx) } -func newQueueWithJob[T comparable](t *testing.T, job Job, shouldCancel bool, dependencies ...T) *Queue[T] { - q := NewQueue[T]() +func newSchedulerWithJob[T comparable]( + t *testing.T, + job Job, + shouldCancel bool, + dependencies ...T, +) *Scheduler[T] { + q := NewScheduler[T]() require.NoError(t, q.Register(context.Background(), job, dependencies...)) if shouldCancel { for _, jobs := range q.dependents { @@ -49,7 +54,7 @@ func newQueueWithJob[T comparable](t *testing.T, job Job, shouldCancel bool, dep return q } -func TestQueue_Register(t *testing.T) { +func TestScheduler_Register(t *testing.T) { var calledExecute bool userJob := &testJob{ execute: func(context.Context) error { @@ -65,28 +70,28 @@ func TestQueue_Register(t *testing.T) { } tests := []struct { - name string - queue *Queue[int] - dependencies []int - wantExecuted bool - wantLen int - wantQueue *Queue[int] + name string + scheduler *Scheduler[int] + dependencies []int + wantExecuted bool + wantNumDependencies int + wantScheduler *Scheduler[int] }{ { - name: "no dependencies", - queue: NewQueue[int](), - dependencies: nil, - wantExecuted: true, - wantLen: 0, - wantQueue: NewQueue[int](), + name: "no dependencies", + scheduler: NewScheduler[int](), + dependencies: nil, + wantExecuted: true, + wantNumDependencies: 0, + wantScheduler: NewScheduler[int](), }, { - name: "one dependency", - queue: NewQueue[int](), - dependencies: []int{depToResolve}, - wantExecuted: false, - wantLen: 1, - wantQueue: &Queue[int]{ + name: "one dependency", + scheduler: NewScheduler[int](), + dependencies: []int{depToResolve}, + wantExecuted: false, + wantNumDependencies: 1, + wantScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToResolve: { { @@ -98,12 +103,12 @@ func TestQueue_Register(t *testing.T) { }, }, { - name: "two dependencies", - queue: NewQueue[int](), - dependencies: []int{depToResolve, depToNeglect}, - wantExecuted: false, - wantLen: 2, - wantQueue: &Queue[int]{ + name: "two dependencies", + scheduler: NewScheduler[int](), + dependencies: []int{depToResolve, depToNeglect}, + wantExecuted: false, + wantNumDependencies: 2, + wantScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToResolve: { { @@ -121,12 +126,12 @@ func TestQueue_Register(t *testing.T) { }, }, { - name: "additional dependency", - queue: newQueueWithJob(t, userJob, false, depToResolve), - dependencies: []int{depToResolve}, - wantExecuted: false, - wantLen: 1, - wantQueue: &Queue[int]{ + name: "additional dependency", + scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), + dependencies: []int{depToResolve}, + wantExecuted: false, + wantNumDependencies: 1, + wantScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToResolve: { { @@ -149,15 +154,15 @@ func TestQueue_Register(t *testing.T) { // Reset the variable between tests calledExecute = false - require.NoError(test.queue.Register(context.Background(), userJob, test.dependencies...)) - require.Equal(test.wantLen, test.queue.NumDependencies()) + require.NoError(test.scheduler.Register(context.Background(), userJob, test.dependencies...)) + require.Equal(test.wantNumDependencies, test.scheduler.NumDependencies()) require.Equal(test.wantExecuted, calledExecute) - require.Equal(test.wantQueue, test.queue) + require.Equal(test.wantScheduler, test.scheduler) }) } } -func TestQueue_Fulfill(t *testing.T) { +func TestScheduler_Fulfill(t *testing.T) { var calledExecute bool userJob := &testJob{ execute: func(context.Context) error { @@ -173,40 +178,40 @@ func TestQueue_Fulfill(t *testing.T) { } tests := []struct { - name string - queue *Queue[int] - wantExecute bool - wantQueue *Queue[int] + name string + scheduler *Scheduler[int] + wantExecute bool + wantScheduler *Scheduler[int] }{ { - name: "no jobs", - queue: NewQueue[int](), - wantExecute: false, - wantQueue: NewQueue[int](), + name: "no jobs", + scheduler: NewScheduler[int](), + wantExecute: false, + wantScheduler: NewScheduler[int](), }, { - name: "single dependency", - queue: newQueueWithJob(t, userJob, false, depToResolve), - wantExecute: true, - wantQueue: NewQueue[int](), + name: "single dependency", + scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), + wantExecute: true, + wantScheduler: NewScheduler[int](), }, { - name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, false, depToNeglect), - wantExecute: false, - wantQueue: newQueueWithJob(t, userJob, false, depToNeglect), + name: "non-existent dependency", + scheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), + wantExecute: false, + wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), }, { - name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, false, depToResolve, depToNeglect), - wantExecute: false, - wantQueue: newQueueWithJob(t, userJob, false, depToNeglect), + name: "incomplete dependencies", + scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToNeglect), + wantExecute: false, + wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), }, { - name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, false, depToResolve, depToResolve), - wantExecute: true, - wantQueue: NewQueue[int](), + name: "duplicate dependency", + scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToResolve), + wantExecute: true, + wantScheduler: NewScheduler[int](), }, } for _, test := range tests { @@ -216,14 +221,14 @@ func TestQueue_Fulfill(t *testing.T) { // Reset the variable between tests calledExecute = false - require.NoError(test.queue.Fulfill(context.Background(), depToResolve)) + require.NoError(test.scheduler.Fulfill(context.Background(), depToResolve)) require.Equal(test.wantExecute, calledExecute) - require.Equal(test.wantQueue, test.queue) + require.Equal(test.wantScheduler, test.scheduler) }) } } -func TestQueue_Abandon(t *testing.T) { +func TestScheduler_Abandon(t *testing.T) { var calledCancel bool userJob := &testJob{ execute: func(context.Context) error { @@ -240,39 +245,39 @@ func TestQueue_Abandon(t *testing.T) { tests := []struct { name string - queue *Queue[int] + scheduler *Scheduler[int] wantCancelled bool - wantQueue *Queue[int] + wantScheduler *Scheduler[int] }{ { name: "no jobs", - queue: NewQueue[int](), + scheduler: NewScheduler[int](), wantCancelled: false, - wantQueue: NewQueue[int](), + wantScheduler: NewScheduler[int](), }, { name: "single dependency", - queue: newQueueWithJob(t, userJob, false, depToResolve), + scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), wantCancelled: true, - wantQueue: NewQueue[int](), + wantScheduler: NewScheduler[int](), }, { name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, false, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), wantCancelled: false, - wantQueue: newQueueWithJob(t, userJob, false, depToNeglect), + wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), }, { name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, false, depToResolve, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToNeglect), wantCancelled: false, - wantQueue: newQueueWithJob(t, userJob, true, depToNeglect), + wantScheduler: newSchedulerWithJob(t, userJob, true, depToNeglect), }, { name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, false, depToResolve, depToResolve), + scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToResolve), wantCancelled: true, - wantQueue: NewQueue[int](), + wantScheduler: NewScheduler[int](), }, } for _, test := range tests { @@ -282,9 +287,9 @@ func TestQueue_Abandon(t *testing.T) { // Reset the variable between tests calledCancel = false - require.NoError(test.queue.Abandon(context.Background(), depToResolve)) + require.NoError(test.scheduler.Abandon(context.Background(), depToResolve)) require.Equal(test.wantCancelled, calledCancel) - require.Equal(test.wantQueue, test.queue) + require.Equal(test.wantScheduler, test.scheduler) }) } } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index c0c8c577d90..5a296910aaf 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -21,7 +21,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/ancestor" - "github.com/ava-labs/avalanchego/snow/engine/snowman/event" + "github.com/ava-labs/avalanchego/snow/engine/snowman/job" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/bag" "github.com/ava-labs/avalanchego/utils/bimap" @@ -82,7 +82,7 @@ type Transitive struct { // operations that are blocked on a block being issued. This could be // issuing another block, responding to a query, or applying votes to consensus - blocked *event.Queue[ids.ID] + blocked *job.Scheduler[ids.ID] // number of times build block needs to be called once the number of // processing blocks has gone below the optimal number. @@ -143,7 +143,7 @@ func New(config Config) (*Transitive, error) { nonVerifieds: ancestor.NewTree(), nonVerifiedCache: nonVerifiedCache, acceptedFrontiers: acceptedFrontiers, - blocked: event.NewQueue[ids.ID](), + blocked: job.NewScheduler[ids.ID](), polls: polls, blkReqs: bimap.New[common.Request, ids.ID](), blkReqSourceMetric: make(map[common.Request]prometheus.Counter), diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index bda4e447e9f..4332e969dad 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -9,11 +9,11 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/engine/snowman/event" + "github.com/ava-labs/avalanchego/snow/engine/snowman/job" "github.com/ava-labs/avalanchego/utils/bag" ) -var _ event.Job = (*voter)(nil) +var _ job.Job = (*voter)(nil) // Voter records chits received from [nodeID] once its dependencies are met. type voter struct { From 5d8a48eff931723c13b7189789a1dd4e7432ee13 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 12:40:29 -0400 Subject: [PATCH 10/18] cleanup tests --- snow/engine/snowman/job/scheduler_test.go | 90 ++++++++--------------- 1 file changed, 32 insertions(+), 58 deletions(-) diff --git a/snow/engine/snowman/job/scheduler_test.go b/snow/engine/snowman/job/scheduler_test.go index 95c6315184c..9e5169fd0b8 100644 --- a/snow/engine/snowman/job/scheduler_test.go +++ b/snow/engine/snowman/job/scheduler_test.go @@ -18,22 +18,32 @@ const ( depToNeglect ) -var ( - errDuplicateInvocation = errors.New("job already handled") - errUnexpectedInvocation = errors.New("job handled unexpectedly") -) +var errDuplicateInvocation = errors.New("job already handled") type testJob struct { - execute func(context.Context) error - cancel func(context.Context) error + calledExecute bool + calledCancel bool } -func (j *testJob) Execute(ctx context.Context) error { - return j.execute(ctx) +func (j *testJob) Execute(context.Context) error { + if j.calledExecute { + return errDuplicateInvocation + } + j.calledExecute = true + return nil +} + +func (j *testJob) Cancel(context.Context) error { + if j.calledCancel { + return errDuplicateInvocation + } + j.calledCancel = true + return nil } -func (j *testJob) Cancel(ctx context.Context) error { - return j.cancel(ctx) +func (j *testJob) reset() { + j.calledExecute = false + j.calledCancel = false } func newSchedulerWithJob[T comparable]( @@ -55,20 +65,7 @@ func newSchedulerWithJob[T comparable]( } func TestScheduler_Register(t *testing.T) { - var calledExecute bool - userJob := &testJob{ - execute: func(context.Context) error { - if calledExecute { - return errDuplicateInvocation - } - calledExecute = true - return nil - }, - cancel: func(context.Context) error { - return errUnexpectedInvocation - }, - } - + userJob := &testJob{} tests := []struct { name string scheduler *Scheduler[int] @@ -152,31 +149,19 @@ func TestScheduler_Register(t *testing.T) { require := require.New(t) // Reset the variable between tests - calledExecute = false + userJob.reset() require.NoError(test.scheduler.Register(context.Background(), userJob, test.dependencies...)) require.Equal(test.wantNumDependencies, test.scheduler.NumDependencies()) - require.Equal(test.wantExecuted, calledExecute) + require.Equal(test.wantExecuted, userJob.calledExecute) + require.False(userJob.calledCancel) require.Equal(test.wantScheduler, test.scheduler) }) } } func TestScheduler_Fulfill(t *testing.T) { - var calledExecute bool - userJob := &testJob{ - execute: func(context.Context) error { - if calledExecute { - return errDuplicateInvocation - } - calledExecute = true - return nil - }, - cancel: func(context.Context) error { - return errUnexpectedInvocation - }, - } - + userJob := &testJob{} tests := []struct { name string scheduler *Scheduler[int] @@ -219,30 +204,18 @@ func TestScheduler_Fulfill(t *testing.T) { require := require.New(t) // Reset the variable between tests - calledExecute = false + userJob.reset() require.NoError(test.scheduler.Fulfill(context.Background(), depToResolve)) - require.Equal(test.wantExecute, calledExecute) + require.Equal(test.wantExecute, userJob.calledExecute) + require.False(userJob.calledCancel) require.Equal(test.wantScheduler, test.scheduler) }) } } func TestScheduler_Abandon(t *testing.T) { - var calledCancel bool - userJob := &testJob{ - execute: func(context.Context) error { - return errUnexpectedInvocation - }, - cancel: func(context.Context) error { - if calledCancel { - return errDuplicateInvocation - } - calledCancel = true - return nil - }, - } - + userJob := &testJob{} tests := []struct { name string scheduler *Scheduler[int] @@ -285,10 +258,11 @@ func TestScheduler_Abandon(t *testing.T) { require := require.New(t) // Reset the variable between tests - calledCancel = false + userJob.reset() require.NoError(test.scheduler.Abandon(context.Background(), depToResolve)) - require.Equal(test.wantCancelled, calledCancel) + require.False(userJob.calledExecute) + require.Equal(test.wantCancelled, userJob.calledCancel) require.Equal(test.wantScheduler, test.scheduler) }) } From 4fe57ec59031a0c7b9256abb15a05626b722eedf Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 12:43:17 -0400 Subject: [PATCH 11/18] cleanup comments --- snow/engine/snowman/job/scheduler.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index 1d62f0fac93..163f51c2c2d 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -72,7 +72,7 @@ func (s *Scheduler[_]) NumDependencies() int { // Fulfill a dependency. If all dependencies for a job are fulfilled, the job // will be executed. // -// It is safe to call the queue during the execution of a job. +// It is safe to call the scheduler during the execution of a job. func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error { return s.resolveDependency(ctx, dependency, false) } @@ -81,7 +81,7 @@ func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error { // will be cancelled. The job will only be cancelled once all dependencies are // resolved. // -// It is safe to call the queue during the cancelling of a job. +// It is safe to call the scheduler during the cancelling of a job. func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error { return s.resolveDependency(ctx, dependency, true) } @@ -95,8 +95,6 @@ func (s *Scheduler[T]) resolveDependency( delete(s.dependents, dependency) for _, job := range jobs { - // Removing the dependency keeps the queue in a consistent state. - // However, it isn't strictly needed. job.dependencies.Remove(dependency) job.shouldCancel = shouldCancel || job.shouldCancel From cb01dd4cb5aab17f6c37922d24fdd756416cb4d1 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 14:28:48 -0400 Subject: [PATCH 12/18] Register -> Schedule --- snow/engine/snowman/job/scheduler.go | 9 +++++---- snow/engine/snowman/job/scheduler_test.go | 6 +++--- snow/engine/snowman/transitive.go | 6 +++--- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index 163f51c2c2d..970c6fdfda3 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -41,14 +41,15 @@ func NewScheduler[T comparable]() *Scheduler[T] { } } -// Register a job that should be executed once all of its dependencies are -// fulfilled. In order to prevent a memory leak, all dependencies must -// eventually either be fulfilled or abandoned. +// Schedule a job to be executed once all of its dependencies are fulfilled. +// +// In order to prevent a memory leak, all dependencies must eventually either be +// fulfilled or abandoned. // // While registering a job with duplicate dependencies is discouraged, it is // allowed and treated similarly to registering the job with the dependencies // de-duplicated. -func (s *Scheduler[T]) Register(ctx context.Context, userJob Job, dependencies ...T) error { +func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job, dependencies ...T) error { if len(dependencies) == 0 { return userJob.Execute(ctx) } diff --git a/snow/engine/snowman/job/scheduler_test.go b/snow/engine/snowman/job/scheduler_test.go index 9e5169fd0b8..6f2278d478d 100644 --- a/snow/engine/snowman/job/scheduler_test.go +++ b/snow/engine/snowman/job/scheduler_test.go @@ -53,7 +53,7 @@ func newSchedulerWithJob[T comparable]( dependencies ...T, ) *Scheduler[T] { q := NewScheduler[T]() - require.NoError(t, q.Register(context.Background(), job, dependencies...)) + require.NoError(t, q.Schedule(context.Background(), job, dependencies...)) if shouldCancel { for _, jobs := range q.dependents { for _, j := range jobs { @@ -64,7 +64,7 @@ func newSchedulerWithJob[T comparable]( return q } -func TestScheduler_Register(t *testing.T) { +func TestScheduler_Schedule(t *testing.T) { userJob := &testJob{} tests := []struct { name string @@ -151,7 +151,7 @@ func TestScheduler_Register(t *testing.T) { // Reset the variable between tests userJob.reset() - require.NoError(test.scheduler.Register(context.Background(), userJob, test.dependencies...)) + require.NoError(test.scheduler.Schedule(context.Background(), userJob, test.dependencies...)) require.Equal(test.wantNumDependencies, test.scheduler.NumDependencies()) require.Equal(test.wantExecuted, userJob.calledExecute) require.False(userJob.calledCancel) diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 5a296910aaf..45be62565c9 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -406,7 +406,7 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin deps = append(deps, preferredIDAtHeight) } - if err := t.blocked.Register(ctx, v, deps...); err != nil { + if err := t.blocked.Schedule(ctx, v, deps...); err != nil { return err } return t.executeDeferredWork(ctx) @@ -423,7 +423,7 @@ func (t *Transitive) QueryFailed(ctx context.Context, nodeID ids.NodeID, request nodeID: nodeID, requestID: requestID, } - if err := t.blocked.Register(ctx, v); err != nil { + if err := t.blocked.Schedule(ctx, v); err != nil { return err } return t.executeDeferredWork(ctx) @@ -858,7 +858,7 @@ func (t *Transitive) issue( deps = append(deps, parentID) } - return t.blocked.Register(ctx, i, deps...) + return t.blocked.Schedule(ctx, i, deps...) } // Request that [vdr] send us block [blkID] From 2cfef5726dc29ed31fd63f10da34973357c14995 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 14:31:07 -0400 Subject: [PATCH 13/18] nit --- snow/engine/snowman/job/scheduler_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/snow/engine/snowman/job/scheduler_test.go b/snow/engine/snowman/job/scheduler_test.go index 6f2278d478d..f900447f16c 100644 --- a/snow/engine/snowman/job/scheduler_test.go +++ b/snow/engine/snowman/job/scheduler_test.go @@ -52,16 +52,16 @@ func newSchedulerWithJob[T comparable]( shouldCancel bool, dependencies ...T, ) *Scheduler[T] { - q := NewScheduler[T]() - require.NoError(t, q.Schedule(context.Background(), job, dependencies...)) + s := NewScheduler[T]() + require.NoError(t, s.Schedule(context.Background(), job, dependencies...)) if shouldCancel { - for _, jobs := range q.dependents { + for _, jobs := range s.dependents { for _, j := range jobs { j.shouldCancel = true } } } - return q + return s } func TestScheduler_Schedule(t *testing.T) { From 2cc7edd8510b664c321d541dac47f38973bb773b Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 18:07:24 -0400 Subject: [PATCH 14/18] Add regression test --- snow/engine/snowman/transitive_test.go | 131 +++++++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index f055c8758f1..18a1d320495 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -2857,6 +2857,137 @@ func TestEngineVoteStallRegression(t *testing.T) { require.Equal(choices.Rejected, rejectedChain[0].Status()) } +// When a voter is registered with multiple dependencies, the engine must not +// execute the voter until all of the dependencies have been resolved; even if +// one of the dependencies has been abandoned. +func TestEngineEarlyTerminateVoterRegression(t *testing.T) { + require := require.New(t) + + config := DefaultConfig(t) + nodeID := ids.GenerateTestNodeID() + require.NoError(config.Validators.AddStaker(config.Ctx.SubnetID, nodeID, nil, ids.Empty, 1)) + + sender := &common.SenderTest{ + T: t, + SendChitsF: func(context.Context, ids.NodeID, uint32, ids.ID, ids.ID, ids.ID) {}, + } + sender.Default(true) + config.Sender = sender + + chain := snowmantest.BuildDescendants(snowmantest.Genesis, 3) + vm := &block.TestVM{ + TestVM: common.TestVM{ + T: t, + InitializeF: func( + context.Context, + *snow.Context, + database.Database, + []byte, + []byte, + []byte, + chan<- common.Message, + []*common.Fx, + common.AppSender, + ) error { + return nil + }, + SetStateF: func(context.Context, snow.State) error { + return nil + }, + }, + ParseBlockF: MakeParseBlockF( + []*snowmantest.Block{snowmantest.Genesis}, + chain, + ), + GetBlockF: MakeGetBlockF( + []*snowmantest.Block{snowmantest.Genesis}, + ), + SetPreferenceF: func(context.Context, ids.ID) error { + return nil + }, + LastAcceptedF: MakeLastAcceptedBlockF( + snowmantest.Genesis, + chain, + ), + } + vm.Default(true) + config.VM = vm + + engine, err := New(config) + require.NoError(err) + require.NoError(engine.Start(context.Background(), 0)) + + var pollRequestIDs []uint32 + sender.SendPullQueryF = func(_ context.Context, polledNodeIDs set.Set[ids.NodeID], requestID uint32, _ ids.ID, _ uint64) { + require.Equal(set.Of(nodeID), polledNodeIDs) + pollRequestIDs = append(pollRequestIDs, requestID) + } + + getRequestIDs := make(map[ids.ID]uint32) + sender.SendGetF = func(_ context.Context, requestedNodeID ids.NodeID, requestID uint32, blkID ids.ID) { + require.Equal(nodeID, requestedNodeID) + getRequestIDs[blkID] = requestID + } + + // Issue block 0 to trigger poll 0. + require.NoError(engine.PushQuery( + context.Background(), + nodeID, + 0, + chain[0].Bytes(), + 0, + )) + require.Len(pollRequestIDs, 1) + require.Empty(getRequestIDs) + + // Update GetBlock to return, the newly issued, block 0. This is needed to + // enable the issuance of block 1. + vm.GetBlockF = MakeGetBlockF( + []*snowmantest.Block{snowmantest.Genesis}, + chain[:1], + ) + + // Vote for block 2 or block 1 in poll 0. This should trigger Get requests + // for both block 2 and block 1. + require.NoError(engine.Chits( + context.Background(), + nodeID, + pollRequestIDs[0], + chain[2].ID(), + chain[1].ID(), + snowmantest.GenesisID, + )) + require.Len(pollRequestIDs, 1) + require.Contains(getRequestIDs, chain[1].ID()) + require.Contains(getRequestIDs, chain[2].ID()) + + // Mark the request for block 2 as failed. This should not cause the poll to + // be applied as there is still an outstanding request for block 1. + require.NoError(engine.GetFailed( + context.Background(), + nodeID, + getRequestIDs[chain[2].ID()], + )) + require.Len(pollRequestIDs, 1) + + // Issue block 1. This should cause the poll to be applied to both block 0 + // and block 1. + require.NoError(engine.Put( + context.Background(), + nodeID, + getRequestIDs[chain[1].ID()], + chain[1].Bytes(), + )) + // Because Put added a new preferred block to the chain, a new poll will be + // created. + require.Len(pollRequestIDs, 2) + require.Equal(choices.Accepted, chain[0].Status()) + require.Equal(choices.Accepted, chain[1].Status()) + // Block 2 still hasn't been issued, so it's status should remain + // Processing. + require.Equal(choices.Processing, chain[2].Status()) +} + func TestGetProcessingAncestor(t *testing.T) { var ( ctx = snowtest.ConsensusContext( From 61a5a2779d797c68b26cd7c7afcac22c86b4bba4 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 18:07:36 -0400 Subject: [PATCH 15/18] Remove `Cancel` from the `Job` interface (#3127) --- snow/engine/snowman/issuer.go | 13 +- snow/engine/snowman/job/scheduler.go | 84 ++++----- snow/engine/snowman/job/scheduler_test.go | 197 +++++++++++++++------- snow/engine/snowman/voter.go | 12 +- 4 files changed, 182 insertions(+), 124 deletions(-) diff --git a/snow/engine/snowman/issuer.go b/snow/engine/snowman/issuer.go index efb23a90b5e..9af5fb9716a 100644 --- a/snow/engine/snowman/issuer.go +++ b/snow/engine/snowman/issuer.go @@ -13,7 +13,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/snowman/job" ) -var _ job.Job = (*issuer)(nil) +var _ job.Job[ids.ID] = (*issuer)(nil) // issuer issues [blk] into to consensus after its dependencies are met. type issuer struct { @@ -24,11 +24,14 @@ type issuer struct { issuedMetric prometheus.Counter } -func (i *issuer) Execute(ctx context.Context) error { - return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric) -} +func (i *issuer) Execute(ctx context.Context, _ []ids.ID, abandoned []ids.ID) error { + if len(abandoned) == 0 { + // If the parent block wasn't abandoned, this block can be issued. + return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric) + } -func (i *issuer) Cancel(ctx context.Context) error { + // If the parent block was abandoned, this block should be abandoned as + // well. blkID := i.blk.ID() i.t.removeFromPending(i.blk) i.t.addToNonVerifieds(i.blk) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index 970c6fdfda3..bba746f9662 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -5,31 +5,25 @@ // dependencies. package job -import ( - "context" +import "context" - "github.com/ava-labs/avalanchego/utils/set" -) - -// Job is a unit of work that can be executed or cancelled. -type Job interface { - Execute(context.Context) error - Cancel(context.Context) error +// Job is a unit of work that can be executed based on the result of resolving +// requested dependencies. +type Job[T any] interface { + Execute(ctx context.Context, fulfilled []T, abandoned []T) error } type job[T comparable] struct { - // If empty, the job is ready to be executed. - dependencies set.Set[T] - // If true, the job should be cancelled. - shouldCancel bool - // If nil, the job has already been executed or cancelled. - job Job + // Once all dependencies are resolved, the job will be executed. + numUnresolved int + fulfilled []T + abandoned []T + job Job[T] } // Scheduler implements a dependency graph for jobs. Jobs can be registered with -// dependencies, and once all dependencies are fulfilled, the job will be -// executed. If any of the dependencies are abandoned, the job will be -// cancelled. +// dependencies, and once all dependencies are resolved, the job will be +// executed. type Scheduler[T comparable] struct { // dependents maps a dependency to the jobs that depend on it. dependents map[T][]*job[T] @@ -41,22 +35,22 @@ func NewScheduler[T comparable]() *Scheduler[T] { } } -// Schedule a job to be executed once all of its dependencies are fulfilled. +// Schedule a job to be executed once all of its dependencies are resolved. // // In order to prevent a memory leak, all dependencies must eventually either be // fulfilled or abandoned. // // While registering a job with duplicate dependencies is discouraged, it is -// allowed and treated similarly to registering the job with the dependencies -// de-duplicated. -func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job, dependencies ...T) error { - if len(dependencies) == 0 { - return userJob.Execute(ctx) +// allowed. +func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job[T], dependencies ...T) error { + numUnresolved := len(dependencies) + if numUnresolved == 0 { + return userJob.Execute(ctx, nil, nil) } j := &job[T]{ - dependencies: set.Of(dependencies...), - job: userJob, + numUnresolved: numUnresolved, + job: userJob, } for _, d := range dependencies { s.dependents[d] = append(s.dependents[d], j) @@ -70,51 +64,43 @@ func (s *Scheduler[_]) NumDependencies() int { return len(s.dependents) } -// Fulfill a dependency. If all dependencies for a job are fulfilled, the job +// Fulfill a dependency. If all dependencies for a job are resolved, the job // will be executed. // // It is safe to call the scheduler during the execution of a job. func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error { - return s.resolveDependency(ctx, dependency, false) + return s.resolveDependency(ctx, dependency, true) } -// Abandon a dependency. If any dependencies for a job are abandoned, the job -// will be cancelled. The job will only be cancelled once all dependencies are -// resolved. +// Abandon a dependency. If all dependencies for a job are resolved, the job +// will be executed. // -// It is safe to call the scheduler during the cancelling of a job. +// It is safe to call the scheduler during the execution of a job. func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error { - return s.resolveDependency(ctx, dependency, true) + return s.resolveDependency(ctx, dependency, false) } func (s *Scheduler[T]) resolveDependency( ctx context.Context, dependency T, - shouldCancel bool, + fulfilled bool, ) error { jobs := s.dependents[dependency] delete(s.dependents, dependency) for _, job := range jobs { - job.dependencies.Remove(dependency) - job.shouldCancel = shouldCancel || job.shouldCancel + job.numUnresolved-- + if fulfilled { + job.fulfilled = append(job.fulfilled, dependency) + } else { + job.abandoned = append(job.abandoned, dependency) + } - userJob := job.job - if userJob == nil || job.dependencies.Len() != 0 { + if job.numUnresolved > 0 { continue } - // Mark the job as handled so that any reentrant calls do not interact - // with this job again. - job.job = nil - - var err error - if job.shouldCancel { - err = userJob.Cancel(ctx) - } else { - err = userJob.Execute(ctx) - } - if err != nil { + if err := job.job.Execute(ctx, job.fulfilled, job.abandoned); err != nil { return err } } diff --git a/snow/engine/snowman/job/scheduler_test.go b/snow/engine/snowman/job/scheduler_test.go index f900447f16c..31bd7a379a2 100644 --- a/snow/engine/snowman/job/scheduler_test.go +++ b/snow/engine/snowman/job/scheduler_test.go @@ -9,8 +9,6 @@ import ( "testing" "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/utils/set" ) const ( @@ -18,48 +16,44 @@ const ( depToNeglect ) -var errDuplicateInvocation = errors.New("job already handled") +var errDuplicateExecution = errors.New("job already executed") type testJob struct { calledExecute bool - calledCancel bool + fulfilled []int + abandoned []int } -func (j *testJob) Execute(context.Context) error { +func (j *testJob) Execute(_ context.Context, fulfilled []int, abandoned []int) error { if j.calledExecute { - return errDuplicateInvocation + return errDuplicateExecution } j.calledExecute = true - return nil -} - -func (j *testJob) Cancel(context.Context) error { - if j.calledCancel { - return errDuplicateInvocation - } - j.calledCancel = true + j.fulfilled = fulfilled + j.abandoned = abandoned return nil } func (j *testJob) reset() { j.calledExecute = false - j.calledCancel = false + j.fulfilled = nil + j.abandoned = nil } func newSchedulerWithJob[T comparable]( t *testing.T, - job Job, - shouldCancel bool, - dependencies ...T, + job Job[T], + dependencies []T, + fulfilled []T, + abandoned []T, ) *Scheduler[T] { s := NewScheduler[T]() require.NoError(t, s.Schedule(context.Background(), job, dependencies...)) - if shouldCancel { - for _, jobs := range s.dependents { - for _, j := range jobs { - j.shouldCancel = true - } - } + for _, d := range fulfilled { + require.NoError(t, s.Fulfill(context.Background(), d)) + } + for _, d := range abandoned { + require.NoError(t, s.Abandon(context.Background(), d)) } return s } @@ -92,8 +86,10 @@ func TestScheduler_Schedule(t *testing.T) { dependents: map[int][]*job[int]{ depToResolve: { { - dependencies: set.Of(depToResolve), - job: userJob, + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, }, @@ -109,14 +105,18 @@ func TestScheduler_Schedule(t *testing.T) { dependents: map[int][]*job[int]{ depToResolve: { { - dependencies: set.Of(depToResolve, depToNeglect), - job: userJob, + numUnresolved: 2, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, depToNeglect: { { - dependencies: set.Of(depToResolve, depToNeglect), - job: userJob, + numUnresolved: 2, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, }, @@ -124,7 +124,7 @@ func TestScheduler_Schedule(t *testing.T) { }, { name: "additional dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), dependencies: []int{depToResolve}, wantExecuted: false, wantNumDependencies: 1, @@ -132,12 +132,16 @@ func TestScheduler_Schedule(t *testing.T) { dependents: map[int][]*job[int]{ depToResolve: { { - dependencies: set.Of(depToResolve), - job: userJob, + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, }, { - dependencies: set.Of(depToResolve), - job: userJob, + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, }, @@ -154,7 +158,8 @@ func TestScheduler_Schedule(t *testing.T) { require.NoError(test.scheduler.Schedule(context.Background(), userJob, test.dependencies...)) require.Equal(test.wantNumDependencies, test.scheduler.NumDependencies()) require.Equal(test.wantExecuted, userJob.calledExecute) - require.False(userJob.calledCancel) + require.Empty(userJob.fulfilled) + require.Empty(userJob.abandoned) require.Equal(test.wantScheduler, test.scheduler) }) } @@ -165,37 +170,68 @@ func TestScheduler_Fulfill(t *testing.T) { tests := []struct { name string scheduler *Scheduler[int] - wantExecute bool + wantExecuted bool + wantFulfilled []int + wantAbandoned []int wantScheduler *Scheduler[int] }{ { name: "no jobs", scheduler: NewScheduler[int](), - wantExecute: false, + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, wantScheduler: NewScheduler[int](), }, { name: "single dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), - wantExecute: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: []int{depToResolve}, + wantAbandoned: nil, wantScheduler: NewScheduler[int](), }, { name: "non-existent dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), - wantExecute: false, - wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), }, { name: "incomplete dependencies", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToNeglect), - wantExecute: false, - wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToNeglect: { + { + numUnresolved: 1, + fulfilled: []int{depToResolve}, + abandoned: nil, + job: userJob, + }, + }, + }, + }, }, { name: "duplicate dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToResolve), - wantExecute: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: []int{depToResolve, depToResolve}, + wantAbandoned: nil, + wantScheduler: NewScheduler[int](), + }, + { + name: "previously abandoned", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, []int{depToNeglect}), + wantExecuted: true, + wantFulfilled: []int{depToResolve}, + wantAbandoned: []int{depToNeglect}, wantScheduler: NewScheduler[int](), }, } @@ -207,8 +243,9 @@ func TestScheduler_Fulfill(t *testing.T) { userJob.reset() require.NoError(test.scheduler.Fulfill(context.Background(), depToResolve)) - require.Equal(test.wantExecute, userJob.calledExecute) - require.False(userJob.calledCancel) + require.Equal(test.wantExecuted, userJob.calledExecute) + require.Equal(test.wantFulfilled, userJob.fulfilled) + require.Equal(test.wantAbandoned, userJob.abandoned) require.Equal(test.wantScheduler, test.scheduler) }) } @@ -219,37 +256,68 @@ func TestScheduler_Abandon(t *testing.T) { tests := []struct { name string scheduler *Scheduler[int] - wantCancelled bool + wantExecuted bool + wantFulfilled []int + wantAbandoned []int wantScheduler *Scheduler[int] }{ { name: "no jobs", scheduler: NewScheduler[int](), - wantCancelled: false, + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, wantScheduler: NewScheduler[int](), }, { name: "single dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), - wantCancelled: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: nil, + wantAbandoned: []int{depToResolve}, wantScheduler: NewScheduler[int](), }, { name: "non-existent dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), - wantCancelled: false, - wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), }, { name: "incomplete dependencies", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToNeglect), - wantCancelled: false, - wantScheduler: newSchedulerWithJob(t, userJob, true, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToNeglect: { + { + numUnresolved: 1, + fulfilled: nil, + abandoned: []int{depToResolve}, + job: userJob, + }, + }, + }, + }, }, { name: "duplicate dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToResolve), - wantCancelled: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: nil, + wantAbandoned: []int{depToResolve, depToResolve}, + wantScheduler: NewScheduler[int](), + }, + { + name: "previously fulfilled", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, []int{depToNeglect}, nil), + wantExecuted: true, + wantFulfilled: []int{depToNeglect}, + wantAbandoned: []int{depToResolve}, wantScheduler: NewScheduler[int](), }, } @@ -261,8 +329,9 @@ func TestScheduler_Abandon(t *testing.T) { userJob.reset() require.NoError(test.scheduler.Abandon(context.Background(), depToResolve)) - require.False(userJob.calledExecute) - require.Equal(test.wantCancelled, userJob.calledCancel) + require.Equal(test.wantExecuted, userJob.calledExecute) + require.Equal(test.wantFulfilled, userJob.fulfilled) + require.Equal(test.wantAbandoned, userJob.abandoned) require.Equal(test.wantScheduler, test.scheduler) }) } diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index 4332e969dad..c57a1c73355 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -13,7 +13,7 @@ import ( "github.com/ava-labs/avalanchego/utils/bag" ) -var _ job.Job = (*voter)(nil) +var _ job.Job[ids.ID] = (*voter)(nil) // Voter records chits received from [nodeID] once its dependencies are met. type voter struct { @@ -23,7 +23,11 @@ type voter struct { responseOptions []ids.ID } -func (v *voter) Execute(ctx context.Context) error { +// The resolution results from the dependencies of the voter aren't explicitly +// used. The responseOptions are used to determine which block to apply the vote +// to. The dependencies are only used to optimistically delay the application of +// the vote until the blocks have been issued. +func (v *voter) Execute(ctx context.Context, _ []ids.ID, _ []ids.ID) error { var ( vote ids.ID shouldVote bool @@ -74,7 +78,3 @@ func (v *voter) Execute(ctx context.Context) error { v.t.repoll(ctx) return nil } - -func (v *voter) Cancel(ctx context.Context) error { - return v.Execute(ctx) -} From 8935259e5d64756ac5373855a4f6130f87748da2 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 19 Jun 2024 12:57:24 -0400 Subject: [PATCH 16/18] want -> expected for uniformity --- snow/engine/snowman/job/scheduler_test.go | 250 +++++++++++----------- 1 file changed, 125 insertions(+), 125 deletions(-) diff --git a/snow/engine/snowman/job/scheduler_test.go b/snow/engine/snowman/job/scheduler_test.go index 31bd7a379a2..db6502c5f74 100644 --- a/snow/engine/snowman/job/scheduler_test.go +++ b/snow/engine/snowman/job/scheduler_test.go @@ -61,28 +61,28 @@ func newSchedulerWithJob[T comparable]( func TestScheduler_Schedule(t *testing.T) { userJob := &testJob{} tests := []struct { - name string - scheduler *Scheduler[int] - dependencies []int - wantExecuted bool - wantNumDependencies int - wantScheduler *Scheduler[int] + name string + scheduler *Scheduler[int] + dependencies []int + expectedExecuted bool + expectedNumDependencies int + expectedScheduler *Scheduler[int] }{ { - name: "no dependencies", - scheduler: NewScheduler[int](), - dependencies: nil, - wantExecuted: true, - wantNumDependencies: 0, - wantScheduler: NewScheduler[int](), + name: "no dependencies", + scheduler: NewScheduler[int](), + dependencies: nil, + expectedExecuted: true, + expectedNumDependencies: 0, + expectedScheduler: NewScheduler[int](), }, { - name: "one dependency", - scheduler: NewScheduler[int](), - dependencies: []int{depToResolve}, - wantExecuted: false, - wantNumDependencies: 1, - wantScheduler: &Scheduler[int]{ + name: "one dependency", + scheduler: NewScheduler[int](), + dependencies: []int{depToResolve}, + expectedExecuted: false, + expectedNumDependencies: 1, + expectedScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToResolve: { { @@ -96,12 +96,12 @@ func TestScheduler_Schedule(t *testing.T) { }, }, { - name: "two dependencies", - scheduler: NewScheduler[int](), - dependencies: []int{depToResolve, depToNeglect}, - wantExecuted: false, - wantNumDependencies: 2, - wantScheduler: &Scheduler[int]{ + name: "two dependencies", + scheduler: NewScheduler[int](), + dependencies: []int{depToResolve, depToNeglect}, + expectedExecuted: false, + expectedNumDependencies: 2, + expectedScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToResolve: { { @@ -123,12 +123,12 @@ func TestScheduler_Schedule(t *testing.T) { }, }, { - name: "additional dependency", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), - dependencies: []int{depToResolve}, - wantExecuted: false, - wantNumDependencies: 1, - wantScheduler: &Scheduler[int]{ + name: "additional dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + dependencies: []int{depToResolve}, + expectedExecuted: false, + expectedNumDependencies: 1, + expectedScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToResolve: { { @@ -156,11 +156,11 @@ func TestScheduler_Schedule(t *testing.T) { userJob.reset() require.NoError(test.scheduler.Schedule(context.Background(), userJob, test.dependencies...)) - require.Equal(test.wantNumDependencies, test.scheduler.NumDependencies()) - require.Equal(test.wantExecuted, userJob.calledExecute) + require.Equal(test.expectedNumDependencies, test.scheduler.NumDependencies()) + require.Equal(test.expectedExecuted, userJob.calledExecute) require.Empty(userJob.fulfilled) require.Empty(userJob.abandoned) - require.Equal(test.wantScheduler, test.scheduler) + require.Equal(test.expectedScheduler, test.scheduler) }) } } @@ -168,44 +168,44 @@ func TestScheduler_Schedule(t *testing.T) { func TestScheduler_Fulfill(t *testing.T) { userJob := &testJob{} tests := []struct { - name string - scheduler *Scheduler[int] - wantExecuted bool - wantFulfilled []int - wantAbandoned []int - wantScheduler *Scheduler[int] + name string + scheduler *Scheduler[int] + expectedExecuted bool + expectedFulfilled []int + expectedAbandoned []int + expectedScheduler *Scheduler[int] }{ { - name: "no jobs", - scheduler: NewScheduler[int](), - wantExecuted: false, - wantFulfilled: nil, - wantAbandoned: nil, - wantScheduler: NewScheduler[int](), + name: "no jobs", + scheduler: NewScheduler[int](), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), }, { - name: "single dependency", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), - wantExecuted: true, - wantFulfilled: []int{depToResolve}, - wantAbandoned: nil, - wantScheduler: NewScheduler[int](), + name: "single dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: []int{depToResolve}, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), }, { - name: "non-existent dependency", - scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), - wantExecuted: false, - wantFulfilled: nil, - wantAbandoned: nil, - wantScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + name: "non-existent dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), }, { - name: "incomplete dependencies", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), - wantExecuted: false, - wantFulfilled: nil, - wantAbandoned: nil, - wantScheduler: &Scheduler[int]{ + name: "incomplete dependencies", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToNeglect: { { @@ -219,20 +219,20 @@ func TestScheduler_Fulfill(t *testing.T) { }, }, { - name: "duplicate dependency", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), - wantExecuted: true, - wantFulfilled: []int{depToResolve, depToResolve}, - wantAbandoned: nil, - wantScheduler: NewScheduler[int](), + name: "duplicate dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: []int{depToResolve, depToResolve}, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), }, { - name: "previously abandoned", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, []int{depToNeglect}), - wantExecuted: true, - wantFulfilled: []int{depToResolve}, - wantAbandoned: []int{depToNeglect}, - wantScheduler: NewScheduler[int](), + name: "previously abandoned", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, []int{depToNeglect}), + expectedExecuted: true, + expectedFulfilled: []int{depToResolve}, + expectedAbandoned: []int{depToNeglect}, + expectedScheduler: NewScheduler[int](), }, } for _, test := range tests { @@ -243,10 +243,10 @@ func TestScheduler_Fulfill(t *testing.T) { userJob.reset() require.NoError(test.scheduler.Fulfill(context.Background(), depToResolve)) - require.Equal(test.wantExecuted, userJob.calledExecute) - require.Equal(test.wantFulfilled, userJob.fulfilled) - require.Equal(test.wantAbandoned, userJob.abandoned) - require.Equal(test.wantScheduler, test.scheduler) + require.Equal(test.expectedExecuted, userJob.calledExecute) + require.Equal(test.expectedFulfilled, userJob.fulfilled) + require.Equal(test.expectedAbandoned, userJob.abandoned) + require.Equal(test.expectedScheduler, test.scheduler) }) } } @@ -254,44 +254,44 @@ func TestScheduler_Fulfill(t *testing.T) { func TestScheduler_Abandon(t *testing.T) { userJob := &testJob{} tests := []struct { - name string - scheduler *Scheduler[int] - wantExecuted bool - wantFulfilled []int - wantAbandoned []int - wantScheduler *Scheduler[int] + name string + scheduler *Scheduler[int] + expectedExecuted bool + expectedFulfilled []int + expectedAbandoned []int + expectedScheduler *Scheduler[int] }{ { - name: "no jobs", - scheduler: NewScheduler[int](), - wantExecuted: false, - wantFulfilled: nil, - wantAbandoned: nil, - wantScheduler: NewScheduler[int](), + name: "no jobs", + scheduler: NewScheduler[int](), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), }, { - name: "single dependency", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), - wantExecuted: true, - wantFulfilled: nil, - wantAbandoned: []int{depToResolve}, - wantScheduler: NewScheduler[int](), + name: "single dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: nil, + expectedAbandoned: []int{depToResolve}, + expectedScheduler: NewScheduler[int](), }, { - name: "non-existent dependency", - scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), - wantExecuted: false, - wantFulfilled: nil, - wantAbandoned: nil, - wantScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + name: "non-existent dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), }, { - name: "incomplete dependencies", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), - wantExecuted: false, - wantFulfilled: nil, - wantAbandoned: nil, - wantScheduler: &Scheduler[int]{ + name: "incomplete dependencies", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: &Scheduler[int]{ dependents: map[int][]*job[int]{ depToNeglect: { { @@ -305,20 +305,20 @@ func TestScheduler_Abandon(t *testing.T) { }, }, { - name: "duplicate dependency", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), - wantExecuted: true, - wantFulfilled: nil, - wantAbandoned: []int{depToResolve, depToResolve}, - wantScheduler: NewScheduler[int](), + name: "duplicate dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: nil, + expectedAbandoned: []int{depToResolve, depToResolve}, + expectedScheduler: NewScheduler[int](), }, { - name: "previously fulfilled", - scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, []int{depToNeglect}, nil), - wantExecuted: true, - wantFulfilled: []int{depToNeglect}, - wantAbandoned: []int{depToResolve}, - wantScheduler: NewScheduler[int](), + name: "previously fulfilled", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, []int{depToNeglect}, nil), + expectedExecuted: true, + expectedFulfilled: []int{depToNeglect}, + expectedAbandoned: []int{depToResolve}, + expectedScheduler: NewScheduler[int](), }, } for _, test := range tests { @@ -329,10 +329,10 @@ func TestScheduler_Abandon(t *testing.T) { userJob.reset() require.NoError(test.scheduler.Abandon(context.Background(), depToResolve)) - require.Equal(test.wantExecuted, userJob.calledExecute) - require.Equal(test.wantFulfilled, userJob.fulfilled) - require.Equal(test.wantAbandoned, userJob.abandoned) - require.Equal(test.wantScheduler, test.scheduler) + require.Equal(test.expectedExecuted, userJob.calledExecute) + require.Equal(test.expectedFulfilled, userJob.fulfilled) + require.Equal(test.expectedAbandoned, userJob.abandoned) + require.Equal(test.expectedScheduler, test.scheduler) }) } } From 83ea62b0307e90f6ae964fe861bf18fcc8594b1b Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 19 Jun 2024 14:34:43 -0400 Subject: [PATCH 17/18] nits --- snow/engine/snowman/job/scheduler.go | 2 ++ snow/engine/snowman/transitive.go | 1 + 2 files changed, 3 insertions(+) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index bba746f9662..a40085032b8 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -37,6 +37,8 @@ func NewScheduler[T comparable]() *Scheduler[T] { // Schedule a job to be executed once all of its dependencies are resolved. // +// If a job is scheduled with no dependencies, it's executed immediately. +// // In order to prevent a memory leak, all dependencies must eventually either be // fulfilled or abandoned. // diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 45be62565c9..680cb9c5e09 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -537,6 +537,7 @@ func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) { zap.Uint32("requestID", t.requestID), zap.Stringer("polls", t.polls), zap.Reflect("outstandingBlockRequests", t.blkReqs), + zap.Int("numMissingDependencies", t.blocked.NumDependencies()), zap.Int("pendingBuildBlocks", t.pendingBuildBlocks), ) From f96fae7172e9749f6e087ecb22dcdf0d5b4b6a92 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 19 Jun 2024 14:34:58 -0400 Subject: [PATCH 18/18] nits --- snow/engine/snowman/job/scheduler.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index a40085032b8..e05f27130de 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -35,9 +35,8 @@ func NewScheduler[T comparable]() *Scheduler[T] { } } -// Schedule a job to be executed once all of its dependencies are resolved. -// -// If a job is scheduled with no dependencies, it's executed immediately. +// Schedule a job to be executed once all of its dependencies are resolved. If a +// job is scheduled with no dependencies, it's executed immediately. // // In order to prevent a memory leak, all dependencies must eventually either be // fulfilled or abandoned.