Skip to content

Commit

Permalink
Cleanup consensus metrics (#2815)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Mar 6, 2024
1 parent 6c76098 commit 66ae8ef
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 50 deletions.
5 changes: 0 additions & 5 deletions snow/engine/snowman/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ func (i *issuer) Abandon(ctx context.Context, _ ids.ID) {
i.t.removeFromPending(i.blk)
i.t.addToNonVerifieds(i.blk)
i.t.blocked.Abandon(ctx, blkID)

// Tracks performance statistics
i.t.metrics.numRequests.Set(float64(i.t.blkReqs.Len()))
i.t.metrics.numBlocked.Set(float64(len(i.t.pending)))
i.t.metrics.numBlockers.Set(float64(i.t.blocked.Len()))
}
i.abandoned = true
}
Expand Down
72 changes: 28 additions & 44 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (t *Transitive) Gossip(ctx context.Context) error {
return nil
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
lastAccepted, err := t.getBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
Expand Down Expand Up @@ -296,7 +296,7 @@ func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint3
if _, err := t.issueFrom(ctx, nodeID, blk, issuedMetric); err != nil {
return err
}
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) GetFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
Expand All @@ -319,9 +319,7 @@ func (t *Transitive) GetFailed(ctx context.Context, nodeID ids.NodeID, requestID

// Because the get request was dropped, we no longer expect blkID to be issued.
t.blocked.Abandon(ctx, blkID)
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) PullQuery(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkID ids.ID, requestedHeight uint64) error {
Expand All @@ -335,7 +333,7 @@ func (t *Transitive) PullQuery(ctx context.Context, nodeID ids.NodeID, requestID
return err
}

return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) PushQuery(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte, requestedHeight uint64) error {
Expand Down Expand Up @@ -376,7 +374,7 @@ func (t *Transitive) PushQuery(ctx context.Context, nodeID ids.NodeID, requestID
return err
}

return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uint32, preferredID ids.ID, preferredIDAtHeight ids.ID, acceptedID ids.ID) error {
Expand Down Expand Up @@ -432,8 +430,7 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin
}

t.blocked.Register(ctx, v)
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (t *Transitive) QueryFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
Expand All @@ -450,8 +447,7 @@ func (t *Transitive) QueryFailed(ctx context.Context, nodeID ids.NodeID, request
requestID: requestID,
},
)
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
}

func (*Transitive) Timeout(context.Context) error {
Expand All @@ -474,7 +470,7 @@ func (t *Transitive) Notify(ctx context.Context, msg common.Message) error {
case common.PendingTxs:
// the pending txs message means we should attempt to build a block.
t.pendingBuildBlocks++
return t.buildBlocks(ctx)
return t.executeDeferredWork(ctx)
case common.StateSyncDone:
t.Ctx.StateSyncing.Set(false)
return nil
Expand All @@ -497,7 +493,7 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
return err
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
lastAccepted, err := t.getBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Error("failed to get last accepted block",
zap.Error(err),
Expand Down Expand Up @@ -549,7 +545,7 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
return fmt.Errorf("failed to notify VM that consensus is starting: %w",
err)
}
return nil
return t.executeDeferredWork(ctx)
}

func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) {
Expand Down Expand Up @@ -580,7 +576,19 @@ func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) {
return intf, fmt.Errorf("vm: %w ; consensus: %w", vmErr, consensusErr)
}

func (t *Transitive) GetBlock(ctx context.Context, blkID ids.ID) (snowman.Block, error) {
func (t *Transitive) executeDeferredWork(ctx context.Context) error {
if err := t.buildBlocks(ctx); err != nil {
return err
}

t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlocked.Set(float64(len(t.pending)))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
return nil
}

func (t *Transitive) getBlock(ctx context.Context, blkID ids.ID) (snowman.Block, error) {
if blk, ok := t.pending[blkID]; ok {
return blk, nil
}
Expand Down Expand Up @@ -733,7 +741,7 @@ func (t *Transitive) issueFromByID(
blkID ids.ID,
issuedMetric prometheus.Counter,
) (bool, error) {
blk, err := t.GetBlock(ctx, blkID)
blk, err := t.getBlock(ctx, blkID)
if err != nil {
t.sendRequest(ctx, nodeID, blkID, issuedMetric)
return false, nil
Expand All @@ -759,7 +767,7 @@ func (t *Transitive) issueFrom(

blkID = blk.Parent()
var err error
blk, err = t.GetBlock(ctx, blkID)
blk, err = t.getBlock(ctx, blkID)

// If we don't have this ancestor, request it from [vdr]
if err != nil || !blk.Status().Fetched() {
Expand All @@ -780,10 +788,6 @@ func (t *Transitive) issueFrom(
// dependencies may still be waiting. Therefore, they should abandoned.
t.blocked.Abandon(ctx, blkID)
}

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return issued, t.errs.Err
}

Expand All @@ -804,7 +808,7 @@ func (t *Transitive) issueWithAncestors(
return false, err
}
blkID = blk.Parent()
blk, err = t.GetBlock(ctx, blkID)
blk, err = t.getBlock(ctx, blkID)
if err != nil {
status = choices.Unknown
break
Expand All @@ -826,7 +830,6 @@ func (t *Transitive) issueWithAncestors(
// We don't have this block and have no reason to expect that we will get it.
// Abandon the block to avoid a memory leak.
t.blocked.Abandon(ctx, blkID)
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return false, t.errs.Err
}

Expand Down Expand Up @@ -869,7 +872,7 @@ func (t *Transitive) issue(

// block on the parent if needed
parentID := blk.Parent()
if parent, err := t.GetBlock(ctx, parentID); err != nil || !(t.Consensus.Decided(parent) || t.Consensus.Processing(parentID)) {
if parent, err := t.getBlock(ctx, parentID); err != nil || !(t.Consensus.Decided(parent) || t.Consensus.Processing(parentID)) {
t.Ctx.Log.Verbo("block waiting for parent to be issued",
zap.Stringer("blkID", blkID),
zap.Stringer("parentID", parentID),
Expand All @@ -878,11 +881,6 @@ func (t *Transitive) issue(
}

t.blocked.Register(ctx, i)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlocked.Set(float64(len(t.pending)))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand Down Expand Up @@ -912,9 +910,6 @@ func (t *Transitive) sendRequest(
zap.Stringer("blkID", blkID),
)
t.Sender.SendGet(ctx, nodeID, t.requestID, blkID)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
}

// Send a query for this block. If push is set to true, blkBytes will be used to
Expand Down Expand Up @@ -990,16 +985,14 @@ func (t *Transitive) deliver(
// longer pending
t.removeFromPending(blk)
parentID := blk.Parent()
parent, err := t.GetBlock(ctx, parentID)
parent, err := t.getBlock(ctx, parentID)
// Because the dependency must have been fulfilled by the time this function
// is called - we don't expect [err] to be non-nil. But it is handled for
// completness and future proofing.
if err != nil || !(parent.Status() == choices.Accepted || t.Consensus.Processing(parentID)) {
// if the parent isn't processing or the last accepted block, then this
// block is effectively rejected
t.blocked.Abandon(ctx, blkID)
t.metrics.numBlocked.Set(float64(len(t.pending))) // Tracks performance statistics
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand All @@ -1012,8 +1005,6 @@ func (t *Transitive) deliver(
}
if !blkAdded {
t.blocked.Abandon(ctx, blkID)
t.metrics.numBlocked.Set(float64(len(t.pending))) // Tracks performance statistics
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand Down Expand Up @@ -1077,11 +1068,6 @@ func (t *Transitive) deliver(

// If we should issue multiple queries at the same time, we need to repoll
t.repoll(ctx)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
t.metrics.numBlocked.Set(float64(len(t.pending)))
t.metrics.numBlockers.Set(float64(t.blocked.Len()))
return t.errs.Err
}

Expand All @@ -1108,7 +1094,6 @@ func (t *Transitive) addToNonVerifieds(blk snowman.Block) {
if t.nonVerifieds.Has(parentID) || t.Consensus.Processing(parentID) {
t.nonVerifieds.Add(blkID, parentID)
t.nonVerifiedCache.Put(blkID, blk)
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
}
}

Expand Down Expand Up @@ -1140,7 +1125,6 @@ func (t *Transitive) addUnverifiedBlockToConsensus(
issuedMetric.Inc()
t.nonVerifieds.Remove(blkID)
t.nonVerifiedCache.Evict(blkID)
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
t.metrics.issuerStake.Observe(float64(t.Validators.GetWeight(t.Ctx.SubnetID, nodeID)))
t.Ctx.Log.Verbo("adding block to consensus",
zap.Stringer("nodeID", nodeID),
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/snowman/voter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (v *voter) getProcessingAncestor(ctx context.Context, initialVote ids.ID) (
// have at our disposal as a best-effort mechanism to find a valid ancestor.
bubbledVote := v.t.nonVerifieds.GetAncestor(initialVote)
for {
blk, err := v.t.GetBlock(ctx, bubbledVote)
blk, err := v.t.getBlock(ctx, bubbledVote)
// If we cannot retrieve the block, drop [vote]
if err != nil {
v.t.Ctx.Log.Debug("dropping vote",
Expand Down

0 comments on commit 66ae8ef

Please sign in to comment.