Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ type ReplicaChangeType int

const (
Unknown ReplicaChangeType = iota
AddLease
RemoveLease
// AddReplica represents a single replica being added.
AddReplica
// RemoveReplica represents a single replica being removed.
Expand All @@ -154,10 +152,6 @@ func (s ReplicaChangeType) String() string {
switch s {
case Unknown:
return "Unknown"
case AddLease:
return "AddLease"
case RemoveLease:
return "RemoveLease"
case AddReplica:
return "AddReplica"
case RemoveReplica:
Expand Down Expand Up @@ -201,13 +195,15 @@ type ReplicaChange struct {
// AddReplica.
//
// - exists(prev.replicaID) && exists(next.replicaID):
// - If prev.ReplicaType == next.ReplicaType, ReplicaChangeType must be
// AddLease or RemoveLease, with a change in the IsLeaseholder bit.
// - If prev.ReplicaType != next.ReplicaType, ReplicaChangeType is
// ChangeReplica, and this is a promotion/demotion. The IsLeaseholder
// bit can change or be false in both prev and next (it can't be true in
// both since a promoted replica can't have been the leaseholder and a
// replica being demoted cannot retain the lease).
// - ReplicaChangeType must be ChangeReplica.
// - If prev.ReplicaType == next.ReplicaType, there must be a change in the
// IsLeaseholder bit - this is a pure lease transfer.
// - If prev.ReplicaType != next.ReplicaType, this is a promotion/demotion.
// Additionally, either of prev.IsLeaseholder or next.IsLeaseholder may be
// set, indicating that the lease is being lost/gained as part of the
// demotion/promotion. It can't be true in both since a promoted replica
// can't have been the leaseholder and a replica being demoted cannot
// retain the lease).
//
// NB: The prev value is always the state before the change. This is the
// source of truth provided by the leaseholder in the RangeMsg, so will
Expand Down Expand Up @@ -253,14 +249,21 @@ func (rc ReplicaChange) isAddition() bool {
// isUpdate returns true if the change is an update to the replica type or
// leaseholder status. This includes promotion/demotion changes.
func (rc ReplicaChange) isUpdate() bool {
return rc.replicaChangeType == AddLease || rc.replicaChangeType == RemoveLease ||
rc.replicaChangeType == ChangeReplica
return rc.replicaChangeType == ChangeReplica
}

// isPromoDemo returns true if the change is a promotion or demotion of a
// replica (potentially with a lease change).
func (rc ReplicaChange) isPromoDemo() bool {
return rc.replicaChangeType == ChangeReplica
// A ChangeReplica is a promo/demo if it changes the replica type
// (distinguishing it from a pure lease transfer).
return rc.replicaChangeType == ChangeReplica && rc.prev.ReplicaType.ReplicaType != rc.next.ReplicaType.ReplicaType
}

// isPureLeaseTransfer returns true if the change is a pure lease transfer,
// i.e. not one coupled with a change in replica type.
func (rc ReplicaChange) isPureLeaseTransfer() bool {
return rc.replicaChangeType == ChangeReplica && rc.prev.ReplicaType.ReplicaType == rc.next.ReplicaType.ReplicaType
}

func MakeLeaseTransferChanges(
Expand Down Expand Up @@ -308,14 +311,14 @@ func MakeLeaseTransferChanges(
rangeID: rangeID,
prev: remove.ReplicaState,
next: remove.ReplicaIDAndType,
replicaChangeType: RemoveLease,
replicaChangeType: ChangeReplica,
}
addLease := ReplicaChange{
target: addTarget,
rangeID: rangeID,
prev: add.ReplicaState,
next: add.ReplicaIDAndType,
replicaChangeType: AddLease,
replicaChangeType: ChangeReplica,
}
removeLease.next.IsLeaseholder = false
addLease.next.IsLeaseholder = true
Expand Down Expand Up @@ -583,8 +586,9 @@ func (prc PendingRangeChange) IsChangeReplicas() bool {
return true
}

// IsTransferLease returns true if the pending range change is a transfer lease
// operation.
// IsTransferLease returns true if the pending range change is a pure lease
// transfer, i.e. a change in leaseholder without any replica additions,
// removals, or type changes.
func (prc PendingRangeChange) IsTransferLease() bool {
if len(prc.pendingReplicaChanges) != 2 {
return false
Expand Down Expand Up @@ -629,10 +633,7 @@ func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges {
chgs := make([]kvpb.ReplicationChange, 0, len(prc.pendingReplicaChanges))
newLeaseholderIndex := -1
for _, c := range prc.pendingReplicaChanges {
switch c.replicaChangeType {
case ChangeReplica, AddReplica, RemoveReplica:
// These are the only permitted cases.
default:
if c.isPureLeaseTransfer() {
panic(errors.AssertionFailedf("change type %v is not a change replicas", c.replicaChangeType))
}
// The kvserver code represents a change in replica type as an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
var mmaid = atomic.Int64{}

// rebalanceEnv tracks the state and outcomes of a rebalanceStores invocation.
// Recall that such an invocation is on behalf of a local store, but will
// iterate over a slice of shedding stores - these are not necessarily local
// but are instead stores which are overloaded and need to shed load, and for
// which the particular calling local store has at least some leases (i.e. is
// capable of making decisions).
type rebalanceEnv struct {
*clusterState
// rng is the random number generator for non-deterministic decisions.
Expand All @@ -34,15 +39,20 @@ type rebalanceEnv struct {
changes []PendingRangeChange
// rangeMoveCount tracks the number of range moves made.
rangeMoveCount int
// leaseTransferCount tracks the number of lease transfers made.
leaseTransferCount int
// maxRangeMoveCount is the maximum number of range moves allowed.
// maxRangeMoveCount is the maximum number of range moves allowed in the
// context of the current rebalanceStores invocation (across multiple
// shedding stores).
maxRangeMoveCount int
// maxLeaseTransferCount is the maximum number of lease transfers allowed.
// maxLeaseTransferCount is the maximum number of lease transfers allowed in
// the context of the current rebalanceStores invocation. Note that because
// leases can only be shed from the particular local store on whose behalf
// rebalanceStores was called, this limit applies to this particular one
// store.
maxLeaseTransferCount int
// lastFailedChangeDelayDuration is the delay after a failed change before retrying.
lastFailedChangeDelayDuration time.Duration
// now is the timestamp when rebalancing started.
// now is the timestamp representing the start time of the current
// rebalanceStores invocation.
now time.Time
// Scratch variables reused across iterations.
scratch struct {
Expand All @@ -59,6 +69,20 @@ type sheddingStore struct {
storeLoadSummary
}

// The caller has a fixed concurrency limit it can move ranges at, when it
// is the sender of the snapshot. So we don't want to create too many
// changes, since then the allocator gets too far ahead of what has been
// enacted, and its decision-making is no longer based on recent
// information. We don't have this issue with lease transfers since they are
// very fast, so we set a much higher limit.
//
// TODO: revisit these constants.
const maxRangeMoveCount = 1
const maxLeaseTransferCount = 8

// See the long comment where rangeState.lastFailedChange is declared.
const lastFailedChangeDelayDuration time.Duration = 60 * time.Second

// Called periodically, say every 10s.
//
// We do not want to shed replicas for CPU from a remote store until its had a
Expand Down Expand Up @@ -158,25 +182,10 @@ func (cs *clusterState) rebalanceStores(
}
}

// The caller has a fixed concurrency limit it can move ranges at, when it
// is the sender of the snapshot. So we don't want to create too many
// changes, since then the allocator gets too far ahead of what has been
// enacted, and its decision-making is no longer based on recent
// information. We don't have this issue with lease transfers since they are
// very fast, so we set a much higher limit.
//
// TODO: revisit these constants.
const maxRangeMoveCount = 1
const maxLeaseTransferCount = 8
// See the long comment where rangeState.lastFailedChange is declared.
const lastFailedChangeDelayDuration time.Duration = 60 * time.Second
re := &rebalanceEnv{
clusterState: cs,
rng: rng,
dsm: dsm,
changes: []PendingRangeChange{},
rangeMoveCount: 0,
leaseTransferCount: 0,
maxRangeMoveCount: maxRangeMoveCount,
maxLeaseTransferCount: maxLeaseTransferCount,
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
Expand All @@ -185,7 +194,8 @@ func (cs *clusterState) rebalanceStores(
re.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
re.scratch.stores = map[roachpb.StoreID]struct{}{}
for _, store := range sheddingStores {
if re.rangeMoveCount >= re.maxRangeMoveCount || re.leaseTransferCount >= re.maxLeaseTransferCount {
if re.rangeMoveCount >= re.maxRangeMoveCount {
log.KvDistribution.VInfof(ctx, 2, "reached max range move count %d, stopping further rebalancing", re.maxRangeMoveCount)
break
}
re.rebalanceStore(ctx, store, localStoreID)
Expand Down Expand Up @@ -213,7 +223,7 @@ func (re *rebalanceEnv) rebalanceStore(
if !ss.adjusted.replicas[rangeID].IsLeaseholder {
load[CPURate] = rstate.load.RaftCPU
}
fmt.Fprintf(&b, " r%d:%v", rangeID, load)
_, _ = fmt.Fprintf(&b, " r%d:%v", rangeID, load)
}
log.KvDistribution.Infof(ctx, "top-K[%s] ranges for s%d with lease on local s%d:%s",
topKRanges.dim, store.StoreID, localStoreID, b.String())
Expand All @@ -229,8 +239,15 @@ func (re *rebalanceEnv) rebalanceStore(
// behalf of a particular store (vs. being called on behalf of the set
// of local store IDs)?
if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow {
shouldSkipReplicaMoves := re.rebalanceLeases(ctx, ss, store, localStoreID)
if shouldSkipReplicaMoves {
if shouldSkipReplicaMoves := re.rebalanceLeases(ctx, ss, store, localStoreID); shouldSkipReplicaMoves {
// If managed to transfer a lease, wait for it to be done, before
// shedding replicas from this store (which is more costly). Otherwise
// we may needlessly start moving replicas. Note that the store
// rebalancer will call the rebalance method again after the lease
// transfer is done and we may still be considering those transfers as
// pending from a load perspective, so we *may* not be able to do more
// lease transfers -- so be it.
log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d", ss.StoreID)
return
}
} else {
Expand All @@ -245,7 +262,6 @@ func (re *rebalanceEnv) rebalanceStore(
func (re *rebalanceEnv) rebalanceReplicas(
ctx context.Context, store sheddingStore, ss *storeState, localStoreID roachpb.StoreID,
) {
doneShedding := false
if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow &&
re.now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration {
log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID)
Expand All @@ -272,6 +288,9 @@ func (re *rebalanceEnv) rebalanceReplicas(
n := topKRanges.len()
loadDim := topKRanges.dim
for i := 0; i < n; i++ {
if re.rangeMoveCount >= re.maxRangeMoveCount {
return
}
rangeID := topKRanges.index(i)
// TODO(sumeer): the following code belongs in a closure, since we will
// repeat it for some random selection of non topKRanges.
Expand Down Expand Up @@ -423,25 +442,17 @@ func (re *rebalanceEnv) rebalanceReplicas(
log.KvDistribution.VInfof(ctx, 2,
"result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v",
rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load)
if re.rangeMoveCount >= re.maxRangeMoveCount {
log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, re.maxRangeMoveCount)
if ss.maxFractionPendingDecrease >= maxFractionPendingThreshold {
log.KvDistribution.VInfof(ctx, 2,
"s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing; done shedding",
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold)
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
// move and then continue with the rest. There is a risk that the top-K
// have some constraint that prevents rebalancing, while the rest can be
// moved. Running with underprovisioned clusters and expecting load-based
// rebalancing to work well is not in scope.
return
}
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
if doneShedding {
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk",
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
break
}
}
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
// move and then continue with the rest. There is a risk that the top-K
// have some constraint that prevents rebalancing, while the rest can be
// moved. Running with underprovisioned clusters and expecting load-based
// rebalancing to work well is not in scope.
if doneShedding {
log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID)
return
}
}

Expand All @@ -454,10 +465,9 @@ func (re *rebalanceEnv) rebalanceLeases(
//
// NB: any ranges at this store that don't have pending changes must
// have this local store as the leaseholder.
localLeaseTransferCount := 0
topKRanges := ss.adjusted.topKRanges[localStoreID]
n := topKRanges.len()
doneShedding := false
var leaseTransferCount int
for i := 0; i < n; i++ {
rangeID := topKRanges.index(i)
rstate := re.ranges[rangeID]
Expand Down Expand Up @@ -596,9 +606,8 @@ func (re *rebalanceEnv) rebalanceLeases(
}
re.addPendingRangeChange(leaseChange)
re.changes = append(re.changes, leaseChange)
re.leaseTransferCount++
localLeaseTransferCount++
if re.changes[len(re.changes)-1].IsChangeReplicas() || !re.changes[len(re.changes)-1].IsTransferLease() {
leaseTransferCount++
if !re.changes[len(re.changes)-1].IsTransferLease() {
panic(fmt.Sprintf("lease transfer is invalid: %v", re.changes[len(re.changes)-1]))
}
log.KvDistribution.Infof(ctx,
Expand All @@ -608,28 +617,15 @@ func (re *rebalanceEnv) rebalanceLeases(
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
if re.leaseTransferCount >= re.maxLeaseTransferCount {
if leaseTransferCount >= re.maxLeaseTransferCount {
log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
break
return true
}
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
if doneShedding {
if ss.maxFractionPendingDecrease >= maxFractionPendingThreshold {
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
break
return true
}
}
if doneShedding || localLeaseTransferCount > 0 {
// If managed to transfer a lease, wait for it to be done, before
// shedding replicas from this store (which is more costly). Otherwise
// we may needlessly start moving replicas. Note that the store
// rebalancer will call the rebalance method again after the lease
// transfer is done and we may still be considering those transfers as
// pending from a load perspective, so we *may* not be able to do more
// lease transfers -- so be it.
log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d",
store.StoreID, doneShedding, localLeaseTransferCount)
return true
}
return false
}