Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ func (a AllocatorAction) Remove() bool {
a == AllocatorRemoveDecommissioningNonVoter
}

// Decommissioning indicates an action replacing or removing a decommissioning
// replicas.
func (a AllocatorAction) Decommissioning() bool {
return a == AllocatorRemoveDecommissioningVoter ||
a == AllocatorRemoveDecommissioningNonVoter ||
a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorReplaceDecommissioningNonVoter
}

// TargetReplicaType returns that the action is for a voter or non-voter replica.
func (a AllocatorAction) TargetReplicaType() TargetReplicaType {
var t TargetReplicaType
Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,16 @@ func (bq *baseQueue) addInternal(
item = &replicaItem{rangeID: desc.RangeID, replicaID: replicaID, priority: priority}
bq.addLocked(item)

// If adding this replica has pushed the queue past its maximum size,
// remove the lowest priority element.
// If adding this replica has pushed the queue past its maximum size, remove
// an element. Note that it might not be the lowest priority since heap is not
// guaranteed to be globally ordered. Ideally, we would remove the lowest
// priority element, but it would require additional bookkeeping or a linear
// scan.
if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize {
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
priority, replicaItemToDrop.replicaID)
bq.removeLocked(replicaItemToDrop)
}
// Signal the processLoop that a replica has been added.
select {
Expand Down
11 changes: 2 additions & 9 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2930,15 +2930,8 @@ func (r *Replica) maybeEnqueueProblemRange(
if !isLeaseholder || !leaseValid {
// The replicate queue will not process the replica without a valid lease.
// Track when we skip enqueuing for these reasons.
boolToInt := func(b bool) int {
if b {
return 1
}
return 0
}
reasons := []string{"is not the leaseholder", "the lease is not valid"}
reason := reasons[boolToInt(isLeaseholder)]
log.KvDistribution.VInfof(ctx, 1, "not enqueuing replica %s because %s", r.Desc(), reason)
log.KvDistribution.Infof(ctx, "not enqueuing replica %s because isLeaseholder=%t, leaseValid=%t",
r.Desc(), isLeaseholder, leaseValid)
r.store.metrics.DecommissioningNudgerNotLeaseholderOrInvalidLease.Inc(1)
return
}
Expand Down
9 changes: 1 addition & 8 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,19 +923,12 @@ func (rq *replicateQueue) processOneChange(
}

func maybeAnnotateDecommissionErr(err error, action allocatorimpl.AllocatorAction) error {
if err != nil && isDecommissionAction(action) {
if err != nil && action.Decommissioning() {
err = decommissionPurgatoryError{err}
}
return err
}

func isDecommissionAction(action allocatorimpl.AllocatorAction) bool {
return action == allocatorimpl.AllocatorRemoveDecommissioningVoter ||
action == allocatorimpl.AllocatorRemoveDecommissioningNonVoter ||
action == allocatorimpl.AllocatorReplaceDecommissioningVoter ||
action == allocatorimpl.AllocatorReplaceDecommissioningNonVoter
}

// shedLease takes in a leaseholder replica, looks for a target for transferring
// the lease and, if a suitable target is found (e.g. alive, not draining),
// transfers the lease away.
Expand Down