From f945c0daf1afa19242ba487d4198d37854e28e0a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:21:11 +0100 Subject: [PATCH 01/11] mmaprototype: indent future shedStore wrapper This is step 1 in extracting the shedStore logic into a standalone function. --- .../cluster_state_rebalance_stores.go | 706 +++++++++--------- 1 file changed, 354 insertions(+), 352 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index ecb974409ca6..065193c76c89 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -147,72 +147,263 @@ func (cs *clusterState) rebalanceStores( rangeMoveCount := 0 leaseTransferCount := 0 for idx /*logging only*/, store := range sheddingStores { - log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", - store.StoreID, store.nls, store.sls, store.worstDim) - ss := cs.stores[store.StoreID] + { + log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", + store.StoreID, store.nls, store.sls, store.worstDim) + ss := cs.stores[store.StoreID] - doneShedding := false - if true { - // Debug logging. - topKRanges := ss.adjusted.topKRanges[localStoreID] - n := topKRanges.len() - if n > 0 { - var b strings.Builder + doneShedding := false + if true { + // Debug logging. + topKRanges := ss.adjusted.topKRanges[localStoreID] + n := topKRanges.len() + if n > 0 { + var b strings.Builder + for i := 0; i < n; i++ { + rangeID := topKRanges.index(i) + rstate := cs.ranges[rangeID] + load := rstate.load.Load + if !ss.adjusted.replicas[rangeID].IsLeaseholder { + load[CPURate] = rstate.load.RaftCPU + } + fmt.Fprintf(&b, " r%d:%v", rangeID, load) + } + log.KvDistribution.Infof(ctx, "top-K[%s] ranges for s%d with lease on local s%d:%s", + topKRanges.dim, store.StoreID, localStoreID, b.String()) + } else { + log.KvDistribution.Infof(ctx, "no top-K[%s] ranges found for s%d with lease on local s%d", + topKRanges.dim, store.StoreID, localStoreID) + } + } + + // TODO(tbg): it's somewhat akward that we only enter this branch for + // ss.StoreID == localStoreID and not for *any* calling local store. + // More generally, does it make sense that rebalanceStores is called on + // behalf of a particular store (vs. being called on behalf of the set + // of local store IDs)? + if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow { + log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first", + store.StoreID, store.dimSummary[CPURate], overloadSlow) + // This store is local, and cpu overloaded. Shed leases first. + // + // NB: any ranges at this store that don't have pending changes must + // have this local store as the leaseholder. + topKRanges := ss.adjusted.topKRanges[localStoreID] + n := topKRanges.len() for i := 0; i < n; i++ { rangeID := topKRanges.index(i) rstate := cs.ranges[rangeID] - load := rstate.load.Load - if !ss.adjusted.replicas[rangeID].IsLeaseholder { - load[CPURate] = rstate.load.RaftCPU + if len(rstate.pendingChanges) > 0 { + // If the range has pending changes, don't make more changes. + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) + continue + } + for _, repl := range rstate.replicas { + if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID + continue + } + if !repl.IsLeaseholder { + // TODO(tbg): is this true? Can't there be ranges with replicas on + // multiple local stores, and wouldn't this assertion fire in that + // case once rebalanceStores is invoked on whichever of the two + // stores doesn't hold the lease? + // + // TODO(tbg): see also the other assertion below (leaseholderID != + // store.StoreID) which seems similar to this one. + log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+ + " changes but is not leaseholder: %+v", rstate) + } + } + if now.Sub(rstate.lastFailedChange) < lastFailedChangeDelayDuration { + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) + continue + } + if !cs.ensureAnalyzedConstraints(rstate) { + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) + continue + } + if rstate.constraints.leaseholderID != store.StoreID { + // We should not panic here since the leaseQueue may have shed the + // lease and informed MMA, since the last time MMA computed the + // top-k ranges. This is useful for debugging in the prototype, due + // to the lack of unit tests. + // + // TODO(tbg): can the above scenario currently happen? ComputeChanges + // first processes the leaseholder message and then, still under the + // lock, immediately calls into rebalanceStores (i.e. this store). + // Doesn't this mean that the leaseholder view is up to date? + panic(fmt.Sprintf("internal state inconsistency: "+ + "store=%v range_id=%v should be leaseholder but isn't", + store.StoreID, rangeID)) + } + cands, _ := rstate.constraints.candidatesToMoveLease() + var candsPL storeSet + for _, cand := range cands { + candsPL.insert(cand.storeID) + } + // Always consider the local store (which already holds the lease) as a + // candidate, so that we don't move the lease away if keeping it would be + // the better option overall. + // TODO(tbg): is this really needed? We intentionally exclude the leaseholder + // in candidatesToMoveLease, so why reinsert it now? + candsPL.insert(store.StoreID) + if len(candsPL) <= 1 { + continue // leaseholder is the only candidate + } + clear(scratchNodes) + means := computeMeansForStoreSet(cs, candsPL, scratchNodes, scratchStores) + sls := cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad) + log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL) + if sls.dimSummary[CPURate] < overloadSlow { + // This store is not cpu overloaded relative to these candidates for + // this range. + log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID) + continue + } + var candsSet candidateSet + for _, cand := range cands { + if disp := cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK { + // Don't transfer lease to a store that is lagging. + log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v", + cand.storeID, disp) + continue + } + candSls := cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad) + candsSet.candidates = append(candsSet.candidates, candidateInfo{ + StoreID: cand.storeID, + storeLoadSummary: candSls, + diversityScore: 0, + leasePreferenceIndex: cand.leasePreferenceIndex, + }) + } + if len(candsSet.candidates) == 0 { + log.KvDistribution.Infof( + ctx, + "result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]", + ss.NodeID, ss.StoreID, rangeID, candsPL) + continue + } + // Have candidates. We set ignoreLevel to + // ignoreHigherThanLoadThreshold since this is the only allocator that + // can shed leases for this store, and lease shedding is cheap, and it + // will only add CPU to the target store (so it is ok to ignore other + // dimensions on the target). + targetStoreID := sortTargetCandidateSetAndPick( + ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rng) + if targetStoreID == 0 { + log.KvDistribution.Infof( + ctx, + "result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick", + ss.NodeID, ss.StoreID, rangeID) + continue + } + targetSS := cs.stores[targetStoreID] + var addedLoad LoadVector + // Only adding leaseholder CPU. + addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU + if addedLoad[CPURate] < 0 { + // TODO(sumeer): remove this panic once we are not in an + // experimental phase. + addedLoad[CPURate] = 0 + panic("raft cpu higher than total cpu") + } + if !cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) { + log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", + store.StoreID, targetStoreID, rangeID, addedLoad) + continue + } + addTarget := roachpb.ReplicationTarget{ + NodeID: targetSS.NodeID, + StoreID: targetSS.StoreID, } - fmt.Fprintf(&b, " r%d:%v", rangeID, load) + removeTarget := roachpb.ReplicationTarget{ + NodeID: ss.NodeID, + StoreID: ss.StoreID, + } + replicaChanges := MakeLeaseTransferChanges( + rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) + leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err := cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { + panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) + } + cs.addPendingRangeChange(leaseChange) + changes = append(changes, leaseChange) + leaseTransferCount++ + if changes[len(changes)-1].IsChangeReplicas() || !changes[len(changes)-1].IsTransferLease() { + panic(fmt.Sprintf("lease transfer is invalid: %v", changes[len(changes)-1])) + } + log.KvDistribution.Infof(ctx, + "result(success): shedding r%v lease from s%v to s%v [change:%v] with "+ + "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))", + rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], + ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load, + ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease, + targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) + if leaseTransferCount >= maxLeaseTransferCount { + log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", maxLeaseTransferCount) + return changes + } + doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold + if doneShedding { + log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK", + store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1)) + break + } + } + if doneShedding || leaseTransferCount > 0 { + // If managed to transfer a lease, wait for it to be done, before + // shedding replicas from this store (which is more costly). Otherwise + // we may needlessly start moving replicas. Note that the store + // rebalancer will call the rebalance method again after the lease + // transfer is done and we may still be considering those transfers as + // pending from a load perspective, so we *may* not be able to do more + // lease transfers -- so be it. + log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", + store.StoreID, doneShedding, leaseTransferCount) + continue } - log.KvDistribution.Infof(ctx, "top-K[%s] ranges for s%d with lease on local s%d:%s", - topKRanges.dim, store.StoreID, localStoreID, b.String()) } else { - log.KvDistribution.Infof(ctx, "no top-K[%s] ranges found for s%d with lease on local s%d", - topKRanges.dim, store.StoreID, localStoreID) + log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v", + ss.StoreID, localStoreID, store.dimSummary[CPURate]) + } + + log.KvDistribution.VInfof(ctx, 2, "attempting to shed replicas next") + + if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && + now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { + log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) + continue + } + // If the node is cpu overloaded, or the store/node is not fdOK, exclude + // the other stores on this node from receiving replicas shed by this + // store. + excludeStoresOnNode := store.nls > overloadSlow + storesToExclude = storesToExclude[:0] + if excludeStoresOnNode { + nodeID := ss.NodeID + for _, storeID := range cs.nodes[nodeID].stores { + storesToExclude.insert(storeID) + } + log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID) + } else { + // This store is excluded of course. + storesToExclude.insert(store.StoreID) } - } - // TODO(tbg): it's somewhat akward that we only enter this branch for - // ss.StoreID == localStoreID and not for *any* calling local store. - // More generally, does it make sense that rebalanceStores is called on - // behalf of a particular store (vs. being called on behalf of the set - // of local store IDs)? - if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow { - log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first", - store.StoreID, store.dimSummary[CPURate], overloadSlow) - // This store is local, and cpu overloaded. Shed leases first. - // - // NB: any ranges at this store that don't have pending changes must - // have this local store as the leaseholder. + // Iterate over top-K ranges first and try to move them. topKRanges := ss.adjusted.topKRanges[localStoreID] n := topKRanges.len() + loadDim := topKRanges.dim for i := 0; i < n; i++ { rangeID := topKRanges.index(i) + // TODO(sumeer): the following code belongs in a closure, since we will + // repeat it for some random selection of non topKRanges. rstate := cs.ranges[rangeID] if len(rstate.pendingChanges) > 0 { // If the range has pending changes, don't make more changes. log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) continue } - for _, repl := range rstate.replicas { - if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID - continue - } - if !repl.IsLeaseholder { - // TODO(tbg): is this true? Can't there be ranges with replicas on - // multiple local stores, and wouldn't this assertion fire in that - // case once rebalanceStores is invoked on whichever of the two - // stores doesn't hold the lease? - // - // TODO(tbg): see also the other assertion below (leaseholderID != - // store.StoreID) which seems similar to this one. - log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+ - " changes but is not leaseholder: %+v", rstate) - } - } if now.Sub(rstate.lastFailedChange) < lastFailedChangeDelayDuration { log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) continue @@ -221,92 +412,112 @@ func (cs *clusterState) rebalanceStores( log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) continue } - if rstate.constraints.leaseholderID != store.StoreID { - // We should not panic here since the leaseQueue may have shed the - // lease and informed MMA, since the last time MMA computed the - // top-k ranges. This is useful for debugging in the prototype, due - // to the lack of unit tests. - // - // TODO(tbg): can the above scenario currently happen? ComputeChanges - // first processes the leaseholder message and then, still under the - // lock, immediately calls into rebalanceStores (i.e. this store). - // Doesn't this mean that the leaseholder view is up to date? + isVoter, isNonVoter := rstate.constraints.replicaRole(store.StoreID) + if !isVoter && !isNonVoter { + // We should not panic here since the replicateQueue may have shed the + // lease and informed MMA, since the last time MMA computed the top-k + // ranges. This is useful for debugging in the prototype, due to the + // lack of unit tests. panic(fmt.Sprintf("internal state inconsistency: "+ - "store=%v range_id=%v should be leaseholder but isn't", - store.StoreID, rangeID)) + "store=%v range_id=%v pending-changes=%v "+ + "rstate_replicas=%v rstate_constraints=%v", + store.StoreID, rangeID, rstate.pendingChanges, rstate.replicas, rstate.constraints)) } - cands, _ := rstate.constraints.candidatesToMoveLease() - var candsPL storeSet - for _, cand := range cands { - candsPL.insert(cand.storeID) - } - // Always consider the local store (which already holds the lease) as a - // candidate, so that we don't move the lease away if keeping it would be - // the better option overall. - // TODO(tbg): is this really needed? We intentionally exclude the leaseholder - // in candidatesToMoveLease, so why reinsert it now? - candsPL.insert(store.StoreID) - if len(candsPL) <= 1 { - continue // leaseholder is the only candidate + var conj constraintsConj + var err error + if isVoter { + conj, err = rstate.constraints.candidatesToReplaceVoterForRebalance(store.StoreID) + } else { + conj, err = rstate.constraints.candidatesToReplaceNonVoterForRebalance(store.StoreID) } - clear(scratchNodes) - means := computeMeansForStoreSet(cs, candsPL, scratchNodes, scratchStores) - sls := cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad) - log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL) - if sls.dimSummary[CPURate] < overloadSlow { - // This store is not cpu overloaded relative to these candidates for - // this range. - log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID) + if err != nil { + // This range has some constraints that are violated. Let those be + // fixed first. + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err) continue } - var candsSet candidateSet - for _, cand := range cands { - if disp := cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK { - // Don't transfer lease to a store that is lagging. - log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v", - cand.storeID, disp) + disj[0] = conj + storesToExcludeForRange = append(storesToExcludeForRange[:0], storesToExclude...) + // Also exclude all stores on nodes that have existing replicas. + for _, replica := range rstate.replicas { + storeID := replica.StoreID + if storeID == store.StoreID { + // We don't exclude other stores on this node, since we are allowed to + // transfer the range to them. If the node is overloaded or not fdOK, + // we have already excluded those stores above. continue } - candSls := cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad) - candsSet.candidates = append(candsSet.candidates, candidateInfo{ - StoreID: cand.storeID, - storeLoadSummary: candSls, - diversityScore: 0, - leasePreferenceIndex: cand.leasePreferenceIndex, - }) + nodeID := cs.stores[storeID].NodeID + for _, storeID := range cs.nodes[nodeID].stores { + storesToExcludeForRange.insert(storeID) + } } - if len(candsSet.candidates) == 0 { - log.KvDistribution.Infof( - ctx, - "result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]", - ss.NodeID, ss.StoreID, rangeID, candsPL) + // TODO(sumeer): eliminate cands allocations by passing a scratch slice. + cands, ssSLS := cs.computeCandidatesForRange(ctx, disj[:], storesToExcludeForRange, store.StoreID) + log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v", + rangeID, store.StoreID, ss.adjusted.load) + if log.V(2) { + log.KvDistribution.Infof(ctx, "candidates are:") + for _, c := range cands.candidates { + log.KvDistribution.Infof(ctx, " s%d: %s", c.StoreID, c.storeLoadSummary) + } + } + + if len(cands.candidates) == 0 { + log.KvDistribution.VInfof(ctx, 2, "result(failed): no candidates found for r%d after exclusions", rangeID) continue } - // Have candidates. We set ignoreLevel to - // ignoreHigherThanLoadThreshold since this is the only allocator that - // can shed leases for this store, and lease shedding is cheap, and it - // will only add CPU to the target store (so it is ok to ignore other - // dimensions on the target). + var rlocalities replicasLocalityTiers + if isVoter { + rlocalities = rstate.constraints.voterLocalityTiers + } else { + rlocalities = rstate.constraints.replicaLocalityTiers + } + localities := dsm.getExistingReplicaLocalities(rlocalities) + isLeaseholder := rstate.constraints.leaseholderID == store.StoreID + // Set the diversity score and lease preference index of the candidates. + for _, cand := range cands.candidates { + cand.diversityScore = localities.getScoreChangeForRebalance( + ss.localityTiers, cs.stores[cand.StoreID].localityTiers) + if isLeaseholder { + cand.leasePreferenceIndex = matchedLeasePreferenceIndex( + cand.StoreID, rstate.constraints.spanConfig.leasePreferences, cs.constraintMatcher) + } + } + // Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and + // s3, s4 are loadNormal. Now s4 is considering rebalancing load away + // from s1, but the candidate top-k range has replicas {s1, s3, s4}. So + // the only way to shed load from s1 is a s1 => s2 move. But there may + // be other ranges at other leaseholder stores which can be moved from + // s1 => {s3, s4}. So we should not be doing this sub-optimal transfer + // of load from s1 => s2 unless s1 is not seeing any load shedding for + // some interval of time. We need a way to capture this information in a + // simple but effective manner. For now, we capture this using these + // grace duration thresholds. + ignoreLevel := ignoreLoadNoChangeAndHigher + overloadDur := now.Sub(ss.overloadStartTime) + if overloadDur > ignoreHigherThanLoadThresholdGraceDuration { + ignoreLevel = ignoreHigherThanLoadThreshold + log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", + ignoreLevel, ssSLS.sls, rangeID, overloadDur) + } else if overloadDur > ignoreLoadThresholdAndHigherGraceDuration { + ignoreLevel = ignoreLoadThresholdAndHigher + log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", + ignoreLevel, ssSLS.sls, rangeID, overloadDur) + } targetStoreID := sortTargetCandidateSetAndPick( - ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rng) + ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rng) if targetStoreID == 0 { - log.KvDistribution.Infof( - ctx, - "result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick", - ss.NodeID, ss.StoreID, rangeID) + log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+ + "(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel) continue } targetSS := cs.stores[targetStoreID] - var addedLoad LoadVector - // Only adding leaseholder CPU. - addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU - if addedLoad[CPURate] < 0 { - // TODO(sumeer): remove this panic once we are not in an - // experimental phase. - addedLoad[CPURate] = 0 - panic("raft cpu higher than total cpu") + addedLoad := rstate.load.Load + if !isLeaseholder { + addedLoad[CPURate] = rstate.load.RaftCPU } - if !cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) { + if !cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) { log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", store.StoreID, targetStoreID, rangeID, addedLoad) continue @@ -319,254 +530,45 @@ func (cs *clusterState) rebalanceStores( NodeID: ss.NodeID, StoreID: ss.StoreID, } - replicaChanges := MakeLeaseTransferChanges( - rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:]) - if err := cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { - panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) + if addTarget.StoreID == removeTarget.StoreID { + panic(fmt.Sprintf("internal state inconsistency: "+ + "add=%v==remove_target=%v range_id=%v candidates=%v", + addTarget, removeTarget, rangeID, cands.candidates)) } - cs.addPendingRangeChange(leaseChange) - changes = append(changes, leaseChange) - leaseTransferCount++ - if changes[len(changes)-1].IsChangeReplicas() || !changes[len(changes)-1].IsTransferLease() { - panic(fmt.Sprintf("lease transfer is invalid: %v", changes[len(changes)-1])) + replicaChanges := makeRebalanceReplicaChanges( + rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) + rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err = cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { + panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v", + replicaChanges, rangeID)) } - log.KvDistribution.Infof(ctx, - "result(success): shedding r%v lease from s%v to s%v [change:%v] with "+ - "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))", - rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], - ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load, - ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease, - targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) - if leaseTransferCount >= maxLeaseTransferCount { - log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", maxLeaseTransferCount) + cs.addPendingRangeChange(rangeChange) + changes = append(changes, rangeChange) + rangeMoveCount++ + log.KvDistribution.VInfof(ctx, 2, + "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", + rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], ss.adjusted.load, targetSS.adjusted.load) + if rangeMoveCount >= maxRangeMoveCount { + log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores", store.StoreID, maxRangeMoveCount, len(sheddingStores)-(idx+1)) return changes } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold if doneShedding { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK", + log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk", store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1)) break } } - if doneShedding || leaseTransferCount > 0 { - // If managed to transfer a lease, wait for it to be done, before - // shedding replicas from this store (which is more costly). Otherwise - // we may needlessly start moving replicas. Note that the store - // rebalancer will call the rebalance method again after the lease - // transfer is done and we may still be considering those transfers as - // pending from a load perspective, so we *may* not be able to do more - // lease transfers -- so be it. - log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", - store.StoreID, doneShedding, leaseTransferCount) - continue - } - } else { - log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v", - ss.StoreID, localStoreID, store.dimSummary[CPURate]) - } - - log.KvDistribution.VInfof(ctx, 2, "attempting to shed replicas next") - - if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && - now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { - log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) - continue - } - // If the node is cpu overloaded, or the store/node is not fdOK, exclude - // the other stores on this node from receiving replicas shed by this - // store. - excludeStoresOnNode := store.nls > overloadSlow - storesToExclude = storesToExclude[:0] - if excludeStoresOnNode { - nodeID := ss.NodeID - for _, storeID := range cs.nodes[nodeID].stores { - storesToExclude.insert(storeID) - } - log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID) - } else { - // This store is excluded of course. - storesToExclude.insert(store.StoreID) - } - - // Iterate over top-K ranges first and try to move them. - topKRanges := ss.adjusted.topKRanges[localStoreID] - n := topKRanges.len() - loadDim := topKRanges.dim - for i := 0; i < n; i++ { - rangeID := topKRanges.index(i) - // TODO(sumeer): the following code belongs in a closure, since we will - // repeat it for some random selection of non topKRanges. - rstate := cs.ranges[rangeID] - if len(rstate.pendingChanges) > 0 { - // If the range has pending changes, don't make more changes. - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) - continue - } - if now.Sub(rstate.lastFailedChange) < lastFailedChangeDelayDuration { - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) - continue - } - if !cs.ensureAnalyzedConstraints(rstate) { - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) - continue - } - isVoter, isNonVoter := rstate.constraints.replicaRole(store.StoreID) - if !isVoter && !isNonVoter { - // We should not panic here since the replicateQueue may have shed the - // lease and informed MMA, since the last time MMA computed the top-k - // ranges. This is useful for debugging in the prototype, due to the - // lack of unit tests. - panic(fmt.Sprintf("internal state inconsistency: "+ - "store=%v range_id=%v pending-changes=%v "+ - "rstate_replicas=%v rstate_constraints=%v", - store.StoreID, rangeID, rstate.pendingChanges, rstate.replicas, rstate.constraints)) - } - var conj constraintsConj - var err error - if isVoter { - conj, err = rstate.constraints.candidatesToReplaceVoterForRebalance(store.StoreID) - } else { - conj, err = rstate.constraints.candidatesToReplaceNonVoterForRebalance(store.StoreID) - } - if err != nil { - // This range has some constraints that are violated. Let those be - // fixed first. - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err) - continue - } - disj[0] = conj - storesToExcludeForRange = append(storesToExcludeForRange[:0], storesToExclude...) - // Also exclude all stores on nodes that have existing replicas. - for _, replica := range rstate.replicas { - storeID := replica.StoreID - if storeID == store.StoreID { - // We don't exclude other stores on this node, since we are allowed to - // transfer the range to them. If the node is overloaded or not fdOK, - // we have already excluded those stores above. - continue - } - nodeID := cs.stores[storeID].NodeID - for _, storeID := range cs.nodes[nodeID].stores { - storesToExcludeForRange.insert(storeID) - } - } - // TODO(sumeer): eliminate cands allocations by passing a scratch slice. - cands, ssSLS := cs.computeCandidatesForRange(ctx, disj[:], storesToExcludeForRange, store.StoreID) - log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v", - rangeID, store.StoreID, ss.adjusted.load) - if log.V(2) { - log.KvDistribution.Infof(ctx, "candidates are:") - for _, c := range cands.candidates { - log.KvDistribution.Infof(ctx, " s%d: %s", c.StoreID, c.storeLoadSummary) - } - } - - if len(cands.candidates) == 0 { - log.KvDistribution.VInfof(ctx, 2, "result(failed): no candidates found for r%d after exclusions", rangeID) - continue - } - var rlocalities replicasLocalityTiers - if isVoter { - rlocalities = rstate.constraints.voterLocalityTiers - } else { - rlocalities = rstate.constraints.replicaLocalityTiers - } - localities := dsm.getExistingReplicaLocalities(rlocalities) - isLeaseholder := rstate.constraints.leaseholderID == store.StoreID - // Set the diversity score and lease preference index of the candidates. - for _, cand := range cands.candidates { - cand.diversityScore = localities.getScoreChangeForRebalance( - ss.localityTiers, cs.stores[cand.StoreID].localityTiers) - if isLeaseholder { - cand.leasePreferenceIndex = matchedLeasePreferenceIndex( - cand.StoreID, rstate.constraints.spanConfig.leasePreferences, cs.constraintMatcher) - } - } - // Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and - // s3, s4 are loadNormal. Now s4 is considering rebalancing load away - // from s1, but the candidate top-k range has replicas {s1, s3, s4}. So - // the only way to shed load from s1 is a s1 => s2 move. But there may - // be other ranges at other leaseholder stores which can be moved from - // s1 => {s3, s4}. So we should not be doing this sub-optimal transfer - // of load from s1 => s2 unless s1 is not seeing any load shedding for - // some interval of time. We need a way to capture this information in a - // simple but effective manner. For now, we capture this using these - // grace duration thresholds. - ignoreLevel := ignoreLoadNoChangeAndHigher - overloadDur := now.Sub(ss.overloadStartTime) - if overloadDur > ignoreHigherThanLoadThresholdGraceDuration { - ignoreLevel = ignoreHigherThanLoadThreshold - log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", - ignoreLevel, ssSLS.sls, rangeID, overloadDur) - } else if overloadDur > ignoreLoadThresholdAndHigherGraceDuration { - ignoreLevel = ignoreLoadThresholdAndHigher - log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", - ignoreLevel, ssSLS.sls, rangeID, overloadDur) - } - targetStoreID := sortTargetCandidateSetAndPick( - ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rng) - if targetStoreID == 0 { - log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+ - "(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel) - continue - } - targetSS := cs.stores[targetStoreID] - addedLoad := rstate.load.Load - if !isLeaseholder { - addedLoad[CPURate] = rstate.load.RaftCPU - } - if !cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) { - log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", - store.StoreID, targetStoreID, rangeID, addedLoad) - continue - } - addTarget := roachpb.ReplicationTarget{ - NodeID: targetSS.NodeID, - StoreID: targetSS.StoreID, - } - removeTarget := roachpb.ReplicationTarget{ - NodeID: ss.NodeID, - StoreID: ss.StoreID, - } - if addTarget.StoreID == removeTarget.StoreID { - panic(fmt.Sprintf("internal state inconsistency: "+ - "add=%v==remove_target=%v range_id=%v candidates=%v", - addTarget, removeTarget, rangeID, cands.candidates)) - } - replicaChanges := makeRebalanceReplicaChanges( - rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:]) - if err = cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { - panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v", - replicaChanges, rangeID)) - } - cs.addPendingRangeChange(rangeChange) - changes = append(changes, rangeChange) - rangeMoveCount++ - log.KvDistribution.VInfof(ctx, 2, - "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", - rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], ss.adjusted.load, targetSS.adjusted.load) - if rangeMoveCount >= maxRangeMoveCount { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores", store.StoreID, maxRangeMoveCount, len(sheddingStores)-(idx+1)) - return changes - } - doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold + // TODO(sumeer): For regular rebalancing, we will wait until those top-K + // move and then continue with the rest. There is a risk that the top-K + // have some constraint that prevents rebalancing, while the rest can be + // moved. Running with underprovisioned clusters and expecting load-based + // rebalancing to work well is not in scope. if doneShedding { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk", - store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1)) - break + log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) + continue } } - // TODO(sumeer): For regular rebalancing, we will wait until those top-K - // move and then continue with the rest. There is a risk that the top-K - // have some constraint that prevents rebalancing, while the rest can be - // moved. Running with underprovisioned clusters and expecting load-based - // rebalancing to work well is not in scope. - if doneShedding { - log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) - continue - } } return changes } From c9f1ec65885e609854e0b34a00ee73882a911497 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:22:03 +0100 Subject: [PATCH 02/11] mmaprototype: refactor control flow to use boolean flags Convert return and continue statements to boolean flags (shouldReturnEarly, shouldContinue) that are checked after the block. This prepares the code for extraction into a function by moving all loop control structure outside the block that will become shedStore. --- .../cluster_state_rebalance_stores.go | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index 065193c76c89..a60caf201160 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -147,6 +147,8 @@ func (cs *clusterState) rebalanceStores( rangeMoveCount := 0 leaseTransferCount := 0 for idx /*logging only*/, store := range sheddingStores { + shouldReturnEarly := false + shouldContinue := false { log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", store.StoreID, store.nls, store.sls, store.worstDim) @@ -341,7 +343,8 @@ func (cs *clusterState) rebalanceStores( targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) if leaseTransferCount >= maxLeaseTransferCount { log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", maxLeaseTransferCount) - return changes + shouldReturnEarly = true + break } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold if doneShedding { @@ -360,7 +363,8 @@ func (cs *clusterState) rebalanceStores( // lease transfers -- so be it. log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", store.StoreID, doneShedding, leaseTransferCount) - continue + shouldContinue = true + break } } else { log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v", @@ -372,7 +376,8 @@ func (cs *clusterState) rebalanceStores( if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) - continue + shouldContinue = true + break } // If the node is cpu overloaded, or the store/node is not fdOK, exclude // the other stores on this node from receiving replicas shed by this @@ -550,7 +555,8 @@ func (cs *clusterState) rebalanceStores( rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], ss.adjusted.load, targetSS.adjusted.load) if rangeMoveCount >= maxRangeMoveCount { log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores", store.StoreID, maxRangeMoveCount, len(sheddingStores)-(idx+1)) - return changes + shouldReturnEarly = true + break } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold if doneShedding { @@ -566,9 +572,16 @@ func (cs *clusterState) rebalanceStores( // rebalancing to work well is not in scope. if doneShedding { log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) - continue + shouldContinue = true + break } } + if shouldReturnEarly { + return changes + } + if shouldContinue { + continue + } } return changes } From 215b022402f2d4072393f5368d0d4d58e9e2b18e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:22:43 +0100 Subject: [PATCH 03/11] mmaprototype: convert block to function returning boolean flags Convert the shedStore block to an IIFE that returns (bool, bool) for shouldReturnEarly and shouldContinue. Replace all flag assignments with return statements. This prepares the code for adding more return values and parameters. --- .../cluster_state_rebalance_stores.go | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index a60caf201160..fe5fad28da8a 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -147,9 +147,7 @@ func (cs *clusterState) rebalanceStores( rangeMoveCount := 0 leaseTransferCount := 0 for idx /*logging only*/, store := range sheddingStores { - shouldReturnEarly := false - shouldContinue := false - { + shouldReturnEarly, shouldContinue := func() (bool, bool) { log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", store.StoreID, store.nls, store.sls, store.worstDim) ss := cs.stores[store.StoreID] @@ -343,8 +341,7 @@ func (cs *clusterState) rebalanceStores( targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) if leaseTransferCount >= maxLeaseTransferCount { log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", maxLeaseTransferCount) - shouldReturnEarly = true - break + return true, false } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold if doneShedding { @@ -363,8 +360,7 @@ func (cs *clusterState) rebalanceStores( // lease transfers -- so be it. log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", store.StoreID, doneShedding, leaseTransferCount) - shouldContinue = true - break + return false, true } } else { log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v", @@ -376,8 +372,7 @@ func (cs *clusterState) rebalanceStores( if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) - shouldContinue = true - break + return false, true } // If the node is cpu overloaded, or the store/node is not fdOK, exclude // the other stores on this node from receiving replicas shed by this @@ -555,8 +550,7 @@ func (cs *clusterState) rebalanceStores( rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], ss.adjusted.load, targetSS.adjusted.load) if rangeMoveCount >= maxRangeMoveCount { log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores", store.StoreID, maxRangeMoveCount, len(sheddingStores)-(idx+1)) - shouldReturnEarly = true - break + return true, false } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold if doneShedding { @@ -572,10 +566,10 @@ func (cs *clusterState) rebalanceStores( // rebalancing to work well is not in scope. if doneShedding { log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) - shouldContinue = true - break + return false, true } - } + return false, false + }() if shouldReturnEarly { return changes } From 83970597699d851bfe8fb031091697094ad3fc4b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:36:33 +0100 Subject: [PATCH 04/11] mmaprototype: introduce rebalanceState struct Introduce rebalanceState struct that contains *clusterState and tracks counters and outcomes of rebalanceStores invocation. Refactor the shedStore function to write into *rebalanceState instead of returning values. This prepares for passing *rebalanceState as a parameter in a future change. --- .../cluster_state_rebalance_stores.go | 140 +++++++++++------- 1 file changed, 87 insertions(+), 53 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index fe5fad28da8a..58dbcab173ea 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -23,6 +23,27 @@ import ( var mmaid = atomic.Int64{} +// rebalanceState tracks the state and outcomes of a rebalanceStores invocation. +type rebalanceState struct { + cs *clusterState + // changes accumulates the pending range changes made during rebalancing. + changes []PendingRangeChange + // rangeMoveCount tracks the number of range moves made. + rangeMoveCount int + // leaseTransferCount tracks the number of lease transfers made. + leaseTransferCount int + // shouldReturnEarly indicates the outer loop should return immediately. + shouldReturnEarly bool + // shouldContinue indicates the outer loop should continue to the next iteration. + shouldContinue bool + // maxRangeMoveCount is the maximum number of range moves allowed. + maxRangeMoveCount int + // maxLeaseTransferCount is the maximum number of lease transfers allowed. + maxLeaseTransferCount int + // lastFailedChangeDelayDuration is the delay after a failed change before retrying. + lastFailedChangeDelayDuration time.Duration +} + // Called periodically, say every 10s. // // We do not want to shed replicas for CPU from a remote store until its had a @@ -126,7 +147,6 @@ func (cs *clusterState) rebalanceStores( } } - var changes []PendingRangeChange var disj [1]constraintsConj var storesToExclude storeSet var storesToExcludeForRange storeSet @@ -144,13 +164,22 @@ func (cs *clusterState) rebalanceStores( const maxLeaseTransferCount = 8 // See the long comment where rangeState.lastFailedChange is declared. const lastFailedChangeDelayDuration time.Duration = 60 * time.Second - rangeMoveCount := 0 - leaseTransferCount := 0 + rs := &rebalanceState{ + cs: cs, + changes: []PendingRangeChange{}, + rangeMoveCount: 0, + leaseTransferCount: 0, + shouldReturnEarly: false, + shouldContinue: false, + maxRangeMoveCount: maxRangeMoveCount, + maxLeaseTransferCount: maxLeaseTransferCount, + lastFailedChangeDelayDuration: lastFailedChangeDelayDuration, + } for idx /*logging only*/, store := range sheddingStores { - shouldReturnEarly, shouldContinue := func() (bool, bool) { + func() { log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", store.StoreID, store.nls, store.sls, store.worstDim) - ss := cs.stores[store.StoreID] + ss := rs.cs.stores[store.StoreID] doneShedding := false if true { @@ -161,7 +190,7 @@ func (cs *clusterState) rebalanceStores( var b strings.Builder for i := 0; i < n; i++ { rangeID := topKRanges.index(i) - rstate := cs.ranges[rangeID] + rstate := rs.cs.ranges[rangeID] load := rstate.load.Load if !ss.adjusted.replicas[rangeID].IsLeaseholder { load[CPURate] = rstate.load.RaftCPU @@ -192,7 +221,7 @@ func (cs *clusterState) rebalanceStores( n := topKRanges.len() for i := 0; i < n; i++ { rangeID := topKRanges.index(i) - rstate := cs.ranges[rangeID] + rstate := rs.cs.ranges[rangeID] if len(rstate.pendingChanges) > 0 { // If the range has pending changes, don't make more changes. log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) @@ -214,11 +243,11 @@ func (cs *clusterState) rebalanceStores( " changes but is not leaseholder: %+v", rstate) } } - if now.Sub(rstate.lastFailedChange) < lastFailedChangeDelayDuration { + if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration { log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) continue } - if !cs.ensureAnalyzedConstraints(rstate) { + if !rs.cs.ensureAnalyzedConstraints(rstate) { log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) continue } @@ -251,8 +280,8 @@ func (cs *clusterState) rebalanceStores( continue // leaseholder is the only candidate } clear(scratchNodes) - means := computeMeansForStoreSet(cs, candsPL, scratchNodes, scratchStores) - sls := cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad) + means := computeMeansForStoreSet(rs.cs, candsPL, scratchNodes, scratchStores) + sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad) log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL) if sls.dimSummary[CPURate] < overloadSlow { // This store is not cpu overloaded relative to these candidates for @@ -262,13 +291,13 @@ func (cs *clusterState) rebalanceStores( } var candsSet candidateSet for _, cand := range cands { - if disp := cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK { + if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK { // Don't transfer lease to a store that is lagging. log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v", cand.storeID, disp) continue } - candSls := cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad) + candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad) candsSet.candidates = append(candsSet.candidates, candidateInfo{ StoreID: cand.storeID, storeLoadSummary: candSls, @@ -297,7 +326,7 @@ func (cs *clusterState) rebalanceStores( ss.NodeID, ss.StoreID, rangeID) continue } - targetSS := cs.stores[targetStoreID] + targetSS := rs.cs.stores[targetStoreID] var addedLoad LoadVector // Only adding leaseholder CPU. addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU @@ -307,7 +336,7 @@ func (cs *clusterState) rebalanceStores( addedLoad[CPURate] = 0 panic("raft cpu higher than total cpu") } - if !cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) { + if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) { log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", store.StoreID, targetStoreID, rangeID, addedLoad) continue @@ -323,25 +352,26 @@ func (cs *clusterState) rebalanceStores( replicaChanges := MakeLeaseTransferChanges( rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:]) - if err := cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { + if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) } - cs.addPendingRangeChange(leaseChange) - changes = append(changes, leaseChange) - leaseTransferCount++ - if changes[len(changes)-1].IsChangeReplicas() || !changes[len(changes)-1].IsTransferLease() { - panic(fmt.Sprintf("lease transfer is invalid: %v", changes[len(changes)-1])) + rs.cs.addPendingRangeChange(leaseChange) + rs.changes = append(rs.changes, leaseChange) + rs.leaseTransferCount++ + if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() { + panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1])) } log.KvDistribution.Infof(ctx, "result(success): shedding r%v lease from s%v to s%v [change:%v] with "+ "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))", - rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], + rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load, ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease, targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) - if leaseTransferCount >= maxLeaseTransferCount { - log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", maxLeaseTransferCount) - return true, false + if rs.leaseTransferCount >= rs.maxLeaseTransferCount { + log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount) + rs.shouldReturnEarly = true + return } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold if doneShedding { @@ -350,7 +380,7 @@ func (cs *clusterState) rebalanceStores( break } } - if doneShedding || leaseTransferCount > 0 { + if doneShedding || rs.leaseTransferCount > 0 { // If managed to transfer a lease, wait for it to be done, before // shedding replicas from this store (which is more costly). Otherwise // we may needlessly start moving replicas. Note that the store @@ -359,8 +389,9 @@ func (cs *clusterState) rebalanceStores( // pending from a load perspective, so we *may* not be able to do more // lease transfers -- so be it. log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", - store.StoreID, doneShedding, leaseTransferCount) - return false, true + store.StoreID, doneShedding, rs.leaseTransferCount) + rs.shouldContinue = true + return } } else { log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v", @@ -372,7 +403,8 @@ func (cs *clusterState) rebalanceStores( if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) - return false, true + rs.shouldContinue = true + return } // If the node is cpu overloaded, or the store/node is not fdOK, exclude // the other stores on this node from receiving replicas shed by this @@ -381,7 +413,7 @@ func (cs *clusterState) rebalanceStores( storesToExclude = storesToExclude[:0] if excludeStoresOnNode { nodeID := ss.NodeID - for _, storeID := range cs.nodes[nodeID].stores { + for _, storeID := range rs.cs.nodes[nodeID].stores { storesToExclude.insert(storeID) } log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID) @@ -404,11 +436,11 @@ func (cs *clusterState) rebalanceStores( log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) continue } - if now.Sub(rstate.lastFailedChange) < lastFailedChangeDelayDuration { + if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration { log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) continue } - if !cs.ensureAnalyzedConstraints(rstate) { + if !rs.cs.ensureAnalyzedConstraints(rstate) { log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) continue } @@ -447,13 +479,13 @@ func (cs *clusterState) rebalanceStores( // we have already excluded those stores above. continue } - nodeID := cs.stores[storeID].NodeID - for _, storeID := range cs.nodes[nodeID].stores { + nodeID := rs.cs.stores[storeID].NodeID + for _, storeID := range rs.cs.nodes[nodeID].stores { storesToExcludeForRange.insert(storeID) } } // TODO(sumeer): eliminate cands allocations by passing a scratch slice. - cands, ssSLS := cs.computeCandidatesForRange(ctx, disj[:], storesToExcludeForRange, store.StoreID) + cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, disj[:], storesToExcludeForRange, store.StoreID) log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v", rangeID, store.StoreID, ss.adjusted.load) if log.V(2) { @@ -478,10 +510,10 @@ func (cs *clusterState) rebalanceStores( // Set the diversity score and lease preference index of the candidates. for _, cand := range cands.candidates { cand.diversityScore = localities.getScoreChangeForRebalance( - ss.localityTiers, cs.stores[cand.StoreID].localityTiers) + ss.localityTiers, rs.cs.stores[cand.StoreID].localityTiers) if isLeaseholder { cand.leasePreferenceIndex = matchedLeasePreferenceIndex( - cand.StoreID, rstate.constraints.spanConfig.leasePreferences, cs.constraintMatcher) + cand.StoreID, rstate.constraints.spanConfig.leasePreferences, rs.cs.constraintMatcher) } } // Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and @@ -512,12 +544,12 @@ func (cs *clusterState) rebalanceStores( "(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel) continue } - targetSS := cs.stores[targetStoreID] + targetSS := rs.cs.stores[targetStoreID] addedLoad := rstate.load.Load if !isLeaseholder { addedLoad[CPURate] = rstate.load.RaftCPU } - if !cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) { + if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) { log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", store.StoreID, targetStoreID, rangeID, addedLoad) continue @@ -538,19 +570,20 @@ func (cs *clusterState) rebalanceStores( replicaChanges := makeRebalanceReplicaChanges( rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:]) - if err = cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { + if err = rs.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v", replicaChanges, rangeID)) } - cs.addPendingRangeChange(rangeChange) - changes = append(changes, rangeChange) - rangeMoveCount++ + rs.cs.addPendingRangeChange(rangeChange) + rs.changes = append(rs.changes, rangeChange) + rs.rangeMoveCount++ log.KvDistribution.VInfof(ctx, 2, "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", - rangeID, removeTarget.StoreID, addTarget.StoreID, changes[len(changes)-1], ss.adjusted.load, targetSS.adjusted.load) - if rangeMoveCount >= maxRangeMoveCount { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores", store.StoreID, maxRangeMoveCount, len(sheddingStores)-(idx+1)) - return true, false + rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load) + if rs.rangeMoveCount >= rs.maxRangeMoveCount { + log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores", store.StoreID, rs.maxRangeMoveCount, len(sheddingStores)-(idx+1)) + rs.shouldReturnEarly = true + return } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold if doneShedding { @@ -566,16 +599,17 @@ func (cs *clusterState) rebalanceStores( // rebalancing to work well is not in scope. if doneShedding { log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) - return false, true + rs.shouldContinue = true + return } - return false, false }() - if shouldReturnEarly { - return changes + if rs.shouldReturnEarly { + return rs.changes } - if shouldContinue { + if rs.shouldContinue { + rs.shouldContinue = false continue } } - return changes + return rs.changes } From 81ef5471a655a64cec006a177ec8596920e0ef2a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:37:59 +0100 Subject: [PATCH 05/11] mmaprototype: pass *rebalanceState as parameter to shedStore function Change the shedStore IIFE to accept *rebalanceState as a parameter instead of capturing it from the outer scope. This prepares the function for extraction to a standalone method on *rebalanceState. --- .../allocator/mmaprototype/cluster_state_rebalance_stores.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index 58dbcab173ea..845840d4cae5 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -176,7 +176,7 @@ func (cs *clusterState) rebalanceStores( lastFailedChangeDelayDuration: lastFailedChangeDelayDuration, } for idx /*logging only*/, store := range sheddingStores { - func() { + func(rs *rebalanceState) { log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", store.StoreID, store.nls, store.sls, store.worstDim) ss := rs.cs.stores[store.StoreID] @@ -602,7 +602,7 @@ func (cs *clusterState) rebalanceStores( rs.shouldContinue = true return } - }() + }(rs) if rs.shouldReturnEarly { return rs.changes } From 1a36437bcc23cf8a1a1057f5fa1722584021494d Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:40:03 +0100 Subject: [PATCH 06/11] mmaprototype: add store parameter to shedStore function Add store sheddingStore as a parameter to the shedStore function instead of capturing it from the loop. Remove idx parameter as it was only used for logging and is not needed. --- .../mmaprototype/cluster_state_rebalance_stores.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index 845840d4cae5..4b4a15394c60 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -175,8 +175,8 @@ func (cs *clusterState) rebalanceStores( maxLeaseTransferCount: maxLeaseTransferCount, lastFailedChangeDelayDuration: lastFailedChangeDelayDuration, } - for idx /*logging only*/, store := range sheddingStores { - func(rs *rebalanceState) { + for _, store := range sheddingStores { + func(rs *rebalanceState, store sheddingStore) { log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", store.StoreID, store.nls, store.sls, store.worstDim) ss := rs.cs.stores[store.StoreID] @@ -581,7 +581,7 @@ func (cs *clusterState) rebalanceStores( "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load) if rs.rangeMoveCount >= rs.maxRangeMoveCount { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores", store.StoreID, rs.maxRangeMoveCount, len(sheddingStores)-(idx+1)) + log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, rs.maxRangeMoveCount) rs.shouldReturnEarly = true return } @@ -602,7 +602,7 @@ func (cs *clusterState) rebalanceStores( rs.shouldContinue = true return } - }(rs) + }(rs, store) if rs.shouldReturnEarly { return rs.changes } From 471dc654fc64c752b7beb8d6427e778276eb08f3 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:42:18 +0100 Subject: [PATCH 07/11] mmaprototype: move rng and dsm into rebalanceState Move rng and dsm fields into the rebalanceState struct instead of passing them as parameters. This reduces the parameter list and keeps related state together. --- .../cluster_state_rebalance_stores.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index 4b4a15394c60..8c211f7db4c1 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -26,6 +26,10 @@ var mmaid = atomic.Int64{} // rebalanceState tracks the state and outcomes of a rebalanceStores invocation. type rebalanceState struct { cs *clusterState + // rng is the random number generator for non-deterministic decisions. + rng *rand.Rand + // dsm is the diversity scoring memo for computing diversity scores. + dsm *diversityScoringMemo // changes accumulates the pending range changes made during rebalancing. changes []PendingRangeChange // rangeMoveCount tracks the number of range moves made. @@ -166,6 +170,8 @@ func (cs *clusterState) rebalanceStores( const lastFailedChangeDelayDuration time.Duration = 60 * time.Second rs := &rebalanceState{ cs: cs, + rng: rng, + dsm: dsm, changes: []PendingRangeChange{}, rangeMoveCount: 0, leaseTransferCount: 0, @@ -176,7 +182,7 @@ func (cs *clusterState) rebalanceStores( lastFailedChangeDelayDuration: lastFailedChangeDelayDuration, } for _, store := range sheddingStores { - func(rs *rebalanceState, store sheddingStore) { + func(rs *rebalanceState, store sheddingStore, ctx context.Context, localStoreID roachpb.StoreID, now time.Time) { log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", store.StoreID, store.nls, store.sls, store.worstDim) ss := rs.cs.stores[store.StoreID] @@ -318,7 +324,7 @@ func (cs *clusterState) rebalanceStores( // will only add CPU to the target store (so it is ok to ignore other // dimensions on the target). targetStoreID := sortTargetCandidateSetAndPick( - ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rng) + ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng) if targetStoreID == 0 { log.KvDistribution.Infof( ctx, @@ -505,7 +511,7 @@ func (cs *clusterState) rebalanceStores( } else { rlocalities = rstate.constraints.replicaLocalityTiers } - localities := dsm.getExistingReplicaLocalities(rlocalities) + localities := rs.dsm.getExistingReplicaLocalities(rlocalities) isLeaseholder := rstate.constraints.leaseholderID == store.StoreID // Set the diversity score and lease preference index of the candidates. for _, cand := range cands.candidates { @@ -538,7 +544,7 @@ func (cs *clusterState) rebalanceStores( ignoreLevel, ssSLS.sls, rangeID, overloadDur) } targetStoreID := sortTargetCandidateSetAndPick( - ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rng) + ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rs.rng) if targetStoreID == 0 { log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+ "(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel) @@ -602,7 +608,7 @@ func (cs *clusterState) rebalanceStores( rs.shouldContinue = true return } - }(rs, store) + }(rs, store, ctx, localStoreID, now) if rs.shouldReturnEarly { return rs.changes } From 5aa42ddd5f532b3b9e59315a27e9ead701784d26 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:47:26 +0100 Subject: [PATCH 08/11] mmaprototype: add ctx, localStoreID, now parameters and move scratch vars to rebalanceState Add ctx, localStoreID, and now as parameters to the shedStore function. Move scratch variables (disj, storesToExclude, storesToExcludeForRange, scratchNodes, scratchStores) into a nested scratch struct within rebalanceState for better organization. --- .../cluster_state_rebalance_stores.go | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index 8c211f7db4c1..b7e7afd3f21b 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -46,6 +46,14 @@ type rebalanceState struct { maxLeaseTransferCount int // lastFailedChangeDelayDuration is the delay after a failed change before retrying. lastFailedChangeDelayDuration time.Duration + // Scratch variables reused across iterations. + scratch struct { + disj [1]constraintsConj + storesToExclude storeSet + storesToExcludeForRange storeSet + nodes map[roachpb.NodeID]*NodeLoad + stores map[roachpb.StoreID]struct{} + } } // Called periodically, say every 10s. @@ -151,11 +159,6 @@ func (cs *clusterState) rebalanceStores( } } - var disj [1]constraintsConj - var storesToExclude storeSet - var storesToExcludeForRange storeSet - scratchNodes := map[roachpb.NodeID]*NodeLoad{} - scratchStores := map[roachpb.StoreID]struct{}{} // The caller has a fixed concurrency limit it can move ranges at, when it // is the sender of the snapshot. So we don't want to create too many // changes, since then the allocator gets too far ahead of what has been @@ -181,6 +184,8 @@ func (cs *clusterState) rebalanceStores( maxLeaseTransferCount: maxLeaseTransferCount, lastFailedChangeDelayDuration: lastFailedChangeDelayDuration, } + rs.scratch.nodes = map[roachpb.NodeID]*NodeLoad{} + rs.scratch.stores = map[roachpb.StoreID]struct{}{} for _, store := range sheddingStores { func(rs *rebalanceState, store sheddingStore, ctx context.Context, localStoreID roachpb.StoreID, now time.Time) { log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", @@ -285,8 +290,8 @@ func (cs *clusterState) rebalanceStores( if len(candsPL) <= 1 { continue // leaseholder is the only candidate } - clear(scratchNodes) - means := computeMeansForStoreSet(rs.cs, candsPL, scratchNodes, scratchStores) + clear(rs.scratch.nodes) + means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores) sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad) log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL) if sls.dimSummary[CPURate] < overloadSlow { @@ -416,16 +421,16 @@ func (cs *clusterState) rebalanceStores( // the other stores on this node from receiving replicas shed by this // store. excludeStoresOnNode := store.nls > overloadSlow - storesToExclude = storesToExclude[:0] + rs.scratch.storesToExclude = rs.scratch.storesToExclude[:0] if excludeStoresOnNode { nodeID := ss.NodeID for _, storeID := range rs.cs.nodes[nodeID].stores { - storesToExclude.insert(storeID) + rs.scratch.storesToExclude.insert(storeID) } log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID) } else { // This store is excluded of course. - storesToExclude.insert(store.StoreID) + rs.scratch.storesToExclude.insert(store.StoreID) } // Iterate over top-K ranges first and try to move them. @@ -474,8 +479,8 @@ func (cs *clusterState) rebalanceStores( log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err) continue } - disj[0] = conj - storesToExcludeForRange = append(storesToExcludeForRange[:0], storesToExclude...) + rs.scratch.disj[0] = conj + rs.scratch.storesToExcludeForRange = append(rs.scratch.storesToExcludeForRange[:0], rs.scratch.storesToExclude...) // Also exclude all stores on nodes that have existing replicas. for _, replica := range rstate.replicas { storeID := replica.StoreID @@ -487,11 +492,11 @@ func (cs *clusterState) rebalanceStores( } nodeID := rs.cs.stores[storeID].NodeID for _, storeID := range rs.cs.nodes[nodeID].stores { - storesToExcludeForRange.insert(storeID) + rs.scratch.storesToExcludeForRange.insert(storeID) } } // TODO(sumeer): eliminate cands allocations by passing a scratch slice. - cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, disj[:], storesToExcludeForRange, store.StoreID) + cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, rs.scratch.disj[:], rs.scratch.storesToExcludeForRange, store.StoreID) log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v", rangeID, store.StoreID, ss.adjusted.load) if log.V(2) { From 1792c83f831cc1d03cb5b615262e45ec45106c06 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:52:30 +0100 Subject: [PATCH 09/11] mmaprototype: extract shedStore logic into rebalanceStore method Extract the shedStore immediately invoked function expression (IIFE) into a standalone rebalanceStore method on *rebalanceState. Move sheddingStore type definition to top level. The method now has a clean signature with rs as receiver and takes store, ctx, localStoreID, and now as parameters. --- .../cluster_state_rebalance_stores.go | 849 +++++++++--------- 1 file changed, 427 insertions(+), 422 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index b7e7afd3f21b..3fdc1a409a74 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -56,6 +56,11 @@ type rebalanceState struct { } } +type sheddingStore struct { + roachpb.StoreID + storeLoadSummary +} + // Called periodically, say every 10s. // // We do not want to shed replicas for CPU from a remote store until its had a @@ -85,10 +90,6 @@ func (cs *clusterState) rebalanceStores( // cpu utilization while the cluster mean is 70% utilization (as an // example). clusterMeans := cs.meansMemo.getMeans(nil) - type sheddingStore struct { - roachpb.StoreID - storeLoadSummary - } var sheddingStores []sheddingStore log.KvDistribution.Infof(ctx, "cluster means: (stores-load %s) (stores-capacity %s) (nodes-cpu-load %d) (nodes-cpu-capacity %d)", @@ -187,440 +188,444 @@ func (cs *clusterState) rebalanceStores( rs.scratch.nodes = map[roachpb.NodeID]*NodeLoad{} rs.scratch.stores = map[roachpb.StoreID]struct{}{} for _, store := range sheddingStores { - func(rs *rebalanceState, store sheddingStore, ctx context.Context, localStoreID roachpb.StoreID, now time.Time) { - log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", - store.StoreID, store.nls, store.sls, store.worstDim) - ss := rs.cs.stores[store.StoreID] - - doneShedding := false - if true { - // Debug logging. - topKRanges := ss.adjusted.topKRanges[localStoreID] - n := topKRanges.len() - if n > 0 { - var b strings.Builder - for i := 0; i < n; i++ { - rangeID := topKRanges.index(i) - rstate := rs.cs.ranges[rangeID] - load := rstate.load.Load - if !ss.adjusted.replicas[rangeID].IsLeaseholder { - load[CPURate] = rstate.load.RaftCPU - } - fmt.Fprintf(&b, " r%d:%v", rangeID, load) - } - log.KvDistribution.Infof(ctx, "top-K[%s] ranges for s%d with lease on local s%d:%s", - topKRanges.dim, store.StoreID, localStoreID, b.String()) - } else { - log.KvDistribution.Infof(ctx, "no top-K[%s] ranges found for s%d with lease on local s%d", - topKRanges.dim, store.StoreID, localStoreID) - } - } - - // TODO(tbg): it's somewhat akward that we only enter this branch for - // ss.StoreID == localStoreID and not for *any* calling local store. - // More generally, does it make sense that rebalanceStores is called on - // behalf of a particular store (vs. being called on behalf of the set - // of local store IDs)? - if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow { - log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first", - store.StoreID, store.dimSummary[CPURate], overloadSlow) - // This store is local, and cpu overloaded. Shed leases first. - // - // NB: any ranges at this store that don't have pending changes must - // have this local store as the leaseholder. - topKRanges := ss.adjusted.topKRanges[localStoreID] - n := topKRanges.len() - for i := 0; i < n; i++ { - rangeID := topKRanges.index(i) - rstate := rs.cs.ranges[rangeID] - if len(rstate.pendingChanges) > 0 { - // If the range has pending changes, don't make more changes. - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) - continue - } - for _, repl := range rstate.replicas { - if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID - continue - } - if !repl.IsLeaseholder { - // TODO(tbg): is this true? Can't there be ranges with replicas on - // multiple local stores, and wouldn't this assertion fire in that - // case once rebalanceStores is invoked on whichever of the two - // stores doesn't hold the lease? - // - // TODO(tbg): see also the other assertion below (leaseholderID != - // store.StoreID) which seems similar to this one. - log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+ - " changes but is not leaseholder: %+v", rstate) - } - } - if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration { - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) - continue - } - if !rs.cs.ensureAnalyzedConstraints(rstate) { - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) - continue - } - if rstate.constraints.leaseholderID != store.StoreID { - // We should not panic here since the leaseQueue may have shed the - // lease and informed MMA, since the last time MMA computed the - // top-k ranges. This is useful for debugging in the prototype, due - // to the lack of unit tests. - // - // TODO(tbg): can the above scenario currently happen? ComputeChanges - // first processes the leaseholder message and then, still under the - // lock, immediately calls into rebalanceStores (i.e. this store). - // Doesn't this mean that the leaseholder view is up to date? - panic(fmt.Sprintf("internal state inconsistency: "+ - "store=%v range_id=%v should be leaseholder but isn't", - store.StoreID, rangeID)) - } - cands, _ := rstate.constraints.candidatesToMoveLease() - var candsPL storeSet - for _, cand := range cands { - candsPL.insert(cand.storeID) - } - // Always consider the local store (which already holds the lease) as a - // candidate, so that we don't move the lease away if keeping it would be - // the better option overall. - // TODO(tbg): is this really needed? We intentionally exclude the leaseholder - // in candidatesToMoveLease, so why reinsert it now? - candsPL.insert(store.StoreID) - if len(candsPL) <= 1 { - continue // leaseholder is the only candidate - } - clear(rs.scratch.nodes) - means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores) - sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad) - log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL) - if sls.dimSummary[CPURate] < overloadSlow { - // This store is not cpu overloaded relative to these candidates for - // this range. - log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID) - continue - } - var candsSet candidateSet - for _, cand := range cands { - if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK { - // Don't transfer lease to a store that is lagging. - log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v", - cand.storeID, disp) - continue - } - candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad) - candsSet.candidates = append(candsSet.candidates, candidateInfo{ - StoreID: cand.storeID, - storeLoadSummary: candSls, - diversityScore: 0, - leasePreferenceIndex: cand.leasePreferenceIndex, - }) - } - if len(candsSet.candidates) == 0 { - log.KvDistribution.Infof( - ctx, - "result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]", - ss.NodeID, ss.StoreID, rangeID, candsPL) - continue - } - // Have candidates. We set ignoreLevel to - // ignoreHigherThanLoadThreshold since this is the only allocator that - // can shed leases for this store, and lease shedding is cheap, and it - // will only add CPU to the target store (so it is ok to ignore other - // dimensions on the target). - targetStoreID := sortTargetCandidateSetAndPick( - ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng) - if targetStoreID == 0 { - log.KvDistribution.Infof( - ctx, - "result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick", - ss.NodeID, ss.StoreID, rangeID) - continue - } - targetSS := rs.cs.stores[targetStoreID] - var addedLoad LoadVector - // Only adding leaseholder CPU. - addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU - if addedLoad[CPURate] < 0 { - // TODO(sumeer): remove this panic once we are not in an - // experimental phase. - addedLoad[CPURate] = 0 - panic("raft cpu higher than total cpu") - } - if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) { - log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", - store.StoreID, targetStoreID, rangeID, addedLoad) - continue - } - addTarget := roachpb.ReplicationTarget{ - NodeID: targetSS.NodeID, - StoreID: targetSS.StoreID, - } - removeTarget := roachpb.ReplicationTarget{ - NodeID: ss.NodeID, - StoreID: ss.StoreID, - } - replicaChanges := MakeLeaseTransferChanges( - rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:]) - if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { - panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) - } - rs.cs.addPendingRangeChange(leaseChange) - rs.changes = append(rs.changes, leaseChange) - rs.leaseTransferCount++ - if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() { - panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1])) - } - log.KvDistribution.Infof(ctx, - "result(success): shedding r%v lease from s%v to s%v [change:%v] with "+ - "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))", - rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], - ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load, - ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease, - targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) - if rs.leaseTransferCount >= rs.maxLeaseTransferCount { - log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount) - rs.shouldReturnEarly = true - return - } - doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold - if doneShedding { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK", - store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1)) - break - } - } - if doneShedding || rs.leaseTransferCount > 0 { - // If managed to transfer a lease, wait for it to be done, before - // shedding replicas from this store (which is more costly). Otherwise - // we may needlessly start moving replicas. Note that the store - // rebalancer will call the rebalance method again after the lease - // transfer is done and we may still be considering those transfers as - // pending from a load perspective, so we *may* not be able to do more - // lease transfers -- so be it. - log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", - store.StoreID, doneShedding, rs.leaseTransferCount) - rs.shouldContinue = true - return - } - } else { - log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v", - ss.StoreID, localStoreID, store.dimSummary[CPURate]) - } - - log.KvDistribution.VInfof(ctx, 2, "attempting to shed replicas next") + rs.rebalanceStore(store, ctx, localStoreID, now) + if rs.shouldReturnEarly { + return rs.changes + } + if rs.shouldContinue { + rs.shouldContinue = false + continue + } + } + return rs.changes +} - if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && - now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { - log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) - rs.shouldContinue = true - return - } - // If the node is cpu overloaded, or the store/node is not fdOK, exclude - // the other stores on this node from receiving replicas shed by this - // store. - excludeStoresOnNode := store.nls > overloadSlow - rs.scratch.storesToExclude = rs.scratch.storesToExclude[:0] - if excludeStoresOnNode { - nodeID := ss.NodeID - for _, storeID := range rs.cs.nodes[nodeID].stores { - rs.scratch.storesToExclude.insert(storeID) - } - log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID) - } else { - // This store is excluded of course. - rs.scratch.storesToExclude.insert(store.StoreID) - } +func (rs *rebalanceState) rebalanceStore( + store sheddingStore, ctx context.Context, localStoreID roachpb.StoreID, now time.Time, +) { + log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s", + store.StoreID, store.nls, store.sls, store.worstDim) + ss := rs.cs.stores[store.StoreID] - // Iterate over top-K ranges first and try to move them. - topKRanges := ss.adjusted.topKRanges[localStoreID] - n := topKRanges.len() - loadDim := topKRanges.dim + doneShedding := false + if true { + // Debug logging. + topKRanges := ss.adjusted.topKRanges[localStoreID] + n := topKRanges.len() + if n > 0 { + var b strings.Builder for i := 0; i < n; i++ { rangeID := topKRanges.index(i) - // TODO(sumeer): the following code belongs in a closure, since we will - // repeat it for some random selection of non topKRanges. - rstate := cs.ranges[rangeID] - if len(rstate.pendingChanges) > 0 { - // If the range has pending changes, don't make more changes. - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) - continue - } - if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration { - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) - continue - } - if !rs.cs.ensureAnalyzedConstraints(rstate) { - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) - continue - } - isVoter, isNonVoter := rstate.constraints.replicaRole(store.StoreID) - if !isVoter && !isNonVoter { - // We should not panic here since the replicateQueue may have shed the - // lease and informed MMA, since the last time MMA computed the top-k - // ranges. This is useful for debugging in the prototype, due to the - // lack of unit tests. - panic(fmt.Sprintf("internal state inconsistency: "+ - "store=%v range_id=%v pending-changes=%v "+ - "rstate_replicas=%v rstate_constraints=%v", - store.StoreID, rangeID, rstate.pendingChanges, rstate.replicas, rstate.constraints)) - } - var conj constraintsConj - var err error - if isVoter { - conj, err = rstate.constraints.candidatesToReplaceVoterForRebalance(store.StoreID) - } else { - conj, err = rstate.constraints.candidatesToReplaceNonVoterForRebalance(store.StoreID) - } - if err != nil { - // This range has some constraints that are violated. Let those be - // fixed first. - log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err) - continue - } - rs.scratch.disj[0] = conj - rs.scratch.storesToExcludeForRange = append(rs.scratch.storesToExcludeForRange[:0], rs.scratch.storesToExclude...) - // Also exclude all stores on nodes that have existing replicas. - for _, replica := range rstate.replicas { - storeID := replica.StoreID - if storeID == store.StoreID { - // We don't exclude other stores on this node, since we are allowed to - // transfer the range to them. If the node is overloaded or not fdOK, - // we have already excluded those stores above. - continue - } - nodeID := rs.cs.stores[storeID].NodeID - for _, storeID := range rs.cs.nodes[nodeID].stores { - rs.scratch.storesToExcludeForRange.insert(storeID) - } - } - // TODO(sumeer): eliminate cands allocations by passing a scratch slice. - cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, rs.scratch.disj[:], rs.scratch.storesToExcludeForRange, store.StoreID) - log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v", - rangeID, store.StoreID, ss.adjusted.load) - if log.V(2) { - log.KvDistribution.Infof(ctx, "candidates are:") - for _, c := range cands.candidates { - log.KvDistribution.Infof(ctx, " s%d: %s", c.StoreID, c.storeLoadSummary) - } + rstate := rs.cs.ranges[rangeID] + load := rstate.load.Load + if !ss.adjusted.replicas[rangeID].IsLeaseholder { + load[CPURate] = rstate.load.RaftCPU } + fmt.Fprintf(&b, " r%d:%v", rangeID, load) + } + log.KvDistribution.Infof(ctx, "top-K[%s] ranges for s%d with lease on local s%d:%s", + topKRanges.dim, store.StoreID, localStoreID, b.String()) + } else { + log.KvDistribution.Infof(ctx, "no top-K[%s] ranges found for s%d with lease on local s%d", + topKRanges.dim, store.StoreID, localStoreID) + } + } - if len(cands.candidates) == 0 { - log.KvDistribution.VInfof(ctx, 2, "result(failed): no candidates found for r%d after exclusions", rangeID) - continue - } - var rlocalities replicasLocalityTiers - if isVoter { - rlocalities = rstate.constraints.voterLocalityTiers - } else { - rlocalities = rstate.constraints.replicaLocalityTiers - } - localities := rs.dsm.getExistingReplicaLocalities(rlocalities) - isLeaseholder := rstate.constraints.leaseholderID == store.StoreID - // Set the diversity score and lease preference index of the candidates. - for _, cand := range cands.candidates { - cand.diversityScore = localities.getScoreChangeForRebalance( - ss.localityTiers, rs.cs.stores[cand.StoreID].localityTiers) - if isLeaseholder { - cand.leasePreferenceIndex = matchedLeasePreferenceIndex( - cand.StoreID, rstate.constraints.spanConfig.leasePreferences, rs.cs.constraintMatcher) - } - } - // Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and - // s3, s4 are loadNormal. Now s4 is considering rebalancing load away - // from s1, but the candidate top-k range has replicas {s1, s3, s4}. So - // the only way to shed load from s1 is a s1 => s2 move. But there may - // be other ranges at other leaseholder stores which can be moved from - // s1 => {s3, s4}. So we should not be doing this sub-optimal transfer - // of load from s1 => s2 unless s1 is not seeing any load shedding for - // some interval of time. We need a way to capture this information in a - // simple but effective manner. For now, we capture this using these - // grace duration thresholds. - ignoreLevel := ignoreLoadNoChangeAndHigher - overloadDur := now.Sub(ss.overloadStartTime) - if overloadDur > ignoreHigherThanLoadThresholdGraceDuration { - ignoreLevel = ignoreHigherThanLoadThreshold - log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", - ignoreLevel, ssSLS.sls, rangeID, overloadDur) - } else if overloadDur > ignoreLoadThresholdAndHigherGraceDuration { - ignoreLevel = ignoreLoadThresholdAndHigher - log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", - ignoreLevel, ssSLS.sls, rangeID, overloadDur) - } - targetStoreID := sortTargetCandidateSetAndPick( - ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rs.rng) - if targetStoreID == 0 { - log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+ - "(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel) + // TODO(tbg): it's somewhat akward that we only enter this branch for + // ss.StoreID == localStoreID and not for *any* calling local store. + // More generally, does it make sense that rebalanceStores is called on + // behalf of a particular store (vs. being called on behalf of the set + // of local store IDs)? + if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow { + log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first", + store.StoreID, store.dimSummary[CPURate], overloadSlow) + // This store is local, and cpu overloaded. Shed leases first. + // + // NB: any ranges at this store that don't have pending changes must + // have this local store as the leaseholder. + topKRanges := ss.adjusted.topKRanges[localStoreID] + n := topKRanges.len() + for i := 0; i < n; i++ { + rangeID := topKRanges.index(i) + rstate := rs.cs.ranges[rangeID] + if len(rstate.pendingChanges) > 0 { + // If the range has pending changes, don't make more changes. + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) + continue + } + for _, repl := range rstate.replicas { + if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID continue } - targetSS := rs.cs.stores[targetStoreID] - addedLoad := rstate.load.Load - if !isLeaseholder { - addedLoad[CPURate] = rstate.load.RaftCPU + if !repl.IsLeaseholder { + // TODO(tbg): is this true? Can't there be ranges with replicas on + // multiple local stores, and wouldn't this assertion fire in that + // case once rebalanceStores is invoked on whichever of the two + // stores doesn't hold the lease? + // + // TODO(tbg): see also the other assertion below (leaseholderID != + // store.StoreID) which seems similar to this one. + log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+ + " changes but is not leaseholder: %+v", rstate) } - if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) { - log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", - store.StoreID, targetStoreID, rangeID, addedLoad) + } + if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration { + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) + continue + } + if !rs.cs.ensureAnalyzedConstraints(rstate) { + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) + continue + } + if rstate.constraints.leaseholderID != store.StoreID { + // We should not panic here since the leaseQueue may have shed the + // lease and informed MMA, since the last time MMA computed the + // top-k ranges. This is useful for debugging in the prototype, due + // to the lack of unit tests. + // + // TODO(tbg): can the above scenario currently happen? ComputeChanges + // first processes the leaseholder message and then, still under the + // lock, immediately calls into rebalanceStores (i.e. this store). + // Doesn't this mean that the leaseholder view is up to date? + panic(fmt.Sprintf("internal state inconsistency: "+ + "store=%v range_id=%v should be leaseholder but isn't", + store.StoreID, rangeID)) + } + cands, _ := rstate.constraints.candidatesToMoveLease() + var candsPL storeSet + for _, cand := range cands { + candsPL.insert(cand.storeID) + } + // Always consider the local store (which already holds the lease) as a + // candidate, so that we don't move the lease away if keeping it would be + // the better option overall. + // TODO(tbg): is this really needed? We intentionally exclude the leaseholder + // in candidatesToMoveLease, so why reinsert it now? + candsPL.insert(store.StoreID) + if len(candsPL) <= 1 { + continue // leaseholder is the only candidate + } + clear(rs.scratch.nodes) + means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores) + sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad) + log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL) + if sls.dimSummary[CPURate] < overloadSlow { + // This store is not cpu overloaded relative to these candidates for + // this range. + log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID) + continue + } + var candsSet candidateSet + for _, cand := range cands { + if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK { + // Don't transfer lease to a store that is lagging. + log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v", + cand.storeID, disp) continue } - addTarget := roachpb.ReplicationTarget{ - NodeID: targetSS.NodeID, - StoreID: targetSS.StoreID, - } - removeTarget := roachpb.ReplicationTarget{ - NodeID: ss.NodeID, - StoreID: ss.StoreID, - } - if addTarget.StoreID == removeTarget.StoreID { - panic(fmt.Sprintf("internal state inconsistency: "+ - "add=%v==remove_target=%v range_id=%v candidates=%v", - addTarget, removeTarget, rangeID, cands.candidates)) - } - replicaChanges := makeRebalanceReplicaChanges( - rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:]) - if err = rs.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { - panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v", - replicaChanges, rangeID)) - } - rs.cs.addPendingRangeChange(rangeChange) - rs.changes = append(rs.changes, rangeChange) - rs.rangeMoveCount++ - log.KvDistribution.VInfof(ctx, 2, - "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", - rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load) - if rs.rangeMoveCount >= rs.maxRangeMoveCount { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, rs.maxRangeMoveCount) - rs.shouldReturnEarly = true - return - } - doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold - if doneShedding { - log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk", - store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1)) - break - } + candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad) + candsSet.candidates = append(candsSet.candidates, candidateInfo{ + StoreID: cand.storeID, + storeLoadSummary: candSls, + diversityScore: 0, + leasePreferenceIndex: cand.leasePreferenceIndex, + }) } - // TODO(sumeer): For regular rebalancing, we will wait until those top-K - // move and then continue with the rest. There is a risk that the top-K - // have some constraint that prevents rebalancing, while the rest can be - // moved. Running with underprovisioned clusters and expecting load-based - // rebalancing to work well is not in scope. - if doneShedding { - log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) - rs.shouldContinue = true + if len(candsSet.candidates) == 0 { + log.KvDistribution.Infof( + ctx, + "result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]", + ss.NodeID, ss.StoreID, rangeID, candsPL) + continue + } + // Have candidates. We set ignoreLevel to + // ignoreHigherThanLoadThreshold since this is the only allocator that + // can shed leases for this store, and lease shedding is cheap, and it + // will only add CPU to the target store (so it is ok to ignore other + // dimensions on the target). + targetStoreID := sortTargetCandidateSetAndPick( + ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng) + if targetStoreID == 0 { + log.KvDistribution.Infof( + ctx, + "result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick", + ss.NodeID, ss.StoreID, rangeID) + continue + } + targetSS := rs.cs.stores[targetStoreID] + var addedLoad LoadVector + // Only adding leaseholder CPU. + addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU + if addedLoad[CPURate] < 0 { + // TODO(sumeer): remove this panic once we are not in an + // experimental phase. + addedLoad[CPURate] = 0 + panic("raft cpu higher than total cpu") + } + if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) { + log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", + store.StoreID, targetStoreID, rangeID, addedLoad) + continue + } + addTarget := roachpb.ReplicationTarget{ + NodeID: targetSS.NodeID, + StoreID: targetSS.StoreID, + } + removeTarget := roachpb.ReplicationTarget{ + NodeID: ss.NodeID, + StoreID: ss.StoreID, + } + replicaChanges := MakeLeaseTransferChanges( + rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) + leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { + panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) + } + rs.cs.addPendingRangeChange(leaseChange) + rs.changes = append(rs.changes, leaseChange) + rs.leaseTransferCount++ + if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() { + panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1])) + } + log.KvDistribution.Infof(ctx, + "result(success): shedding r%v lease from s%v to s%v [change:%v] with "+ + "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))", + rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], + ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load, + ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease, + targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) + if rs.leaseTransferCount >= rs.maxLeaseTransferCount { + log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount) + rs.shouldReturnEarly = true return } - }(rs, store, ctx, localStoreID, now) - if rs.shouldReturnEarly { - return rs.changes + doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold + if doneShedding { + log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK", + store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1)) + break + } } - if rs.shouldContinue { - rs.shouldContinue = false + if doneShedding || rs.leaseTransferCount > 0 { + // If managed to transfer a lease, wait for it to be done, before + // shedding replicas from this store (which is more costly). Otherwise + // we may needlessly start moving replicas. Note that the store + // rebalancer will call the rebalance method again after the lease + // transfer is done and we may still be considering those transfers as + // pending from a load perspective, so we *may* not be able to do more + // lease transfers -- so be it. + log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", + store.StoreID, doneShedding, rs.leaseTransferCount) + rs.shouldContinue = true + return + } + } else { + log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v", + ss.StoreID, localStoreID, store.dimSummary[CPURate]) + } + + log.KvDistribution.VInfof(ctx, 2, "attempting to shed replicas next") + + if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && + now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { + log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) + rs.shouldContinue = true + return + } + // If the node is cpu overloaded, or the store/node is not fdOK, exclude + // the other stores on this node from receiving replicas shed by this + // store. + excludeStoresOnNode := store.nls > overloadSlow + rs.scratch.storesToExclude = rs.scratch.storesToExclude[:0] + if excludeStoresOnNode { + nodeID := ss.NodeID + for _, storeID := range rs.cs.nodes[nodeID].stores { + rs.scratch.storesToExclude.insert(storeID) + } + log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID) + } else { + // This store is excluded of course. + rs.scratch.storesToExclude.insert(store.StoreID) + } + + // Iterate over top-K ranges first and try to move them. + topKRanges := ss.adjusted.topKRanges[localStoreID] + n := topKRanges.len() + loadDim := topKRanges.dim + for i := 0; i < n; i++ { + rangeID := topKRanges.index(i) + // TODO(sumeer): the following code belongs in a closure, since we will + // repeat it for some random selection of non topKRanges. + rstate := rs.cs.ranges[rangeID] + if len(rstate.pendingChanges) > 0 { + // If the range has pending changes, don't make more changes. + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID) continue } + if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration { + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID) + continue + } + if !rs.cs.ensureAnalyzedConstraints(rstate) { + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID) + continue + } + isVoter, isNonVoter := rstate.constraints.replicaRole(store.StoreID) + if !isVoter && !isNonVoter { + // We should not panic here since the replicateQueue may have shed the + // lease and informed MMA, since the last time MMA computed the top-k + // ranges. This is useful for debugging in the prototype, due to the + // lack of unit tests. + panic(fmt.Sprintf("internal state inconsistency: "+ + "store=%v range_id=%v pending-changes=%v "+ + "rstate_replicas=%v rstate_constraints=%v", + store.StoreID, rangeID, rstate.pendingChanges, rstate.replicas, rstate.constraints)) + } + var conj constraintsConj + var err error + if isVoter { + conj, err = rstate.constraints.candidatesToReplaceVoterForRebalance(store.StoreID) + } else { + conj, err = rstate.constraints.candidatesToReplaceNonVoterForRebalance(store.StoreID) + } + if err != nil { + // This range has some constraints that are violated. Let those be + // fixed first. + log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err) + continue + } + rs.scratch.disj[0] = conj + rs.scratch.storesToExcludeForRange = append(rs.scratch.storesToExcludeForRange[:0], rs.scratch.storesToExclude...) + // Also exclude all stores on nodes that have existing replicas. + for _, replica := range rstate.replicas { + storeID := replica.StoreID + if storeID == store.StoreID { + // We don't exclude other stores on this node, since we are allowed to + // transfer the range to them. If the node is overloaded or not fdOK, + // we have already excluded those stores above. + continue + } + nodeID := rs.cs.stores[storeID].NodeID + for _, storeID := range rs.cs.nodes[nodeID].stores { + rs.scratch.storesToExcludeForRange.insert(storeID) + } + } + // TODO(sumeer): eliminate cands allocations by passing a scratch slice. + cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, rs.scratch.disj[:], rs.scratch.storesToExcludeForRange, store.StoreID) + log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v", + rangeID, store.StoreID, ss.adjusted.load) + if log.V(2) { + log.KvDistribution.Infof(ctx, "candidates are:") + for _, c := range cands.candidates { + log.KvDistribution.Infof(ctx, " s%d: %s", c.StoreID, c.storeLoadSummary) + } + } + + if len(cands.candidates) == 0 { + log.KvDistribution.VInfof(ctx, 2, "result(failed): no candidates found for r%d after exclusions", rangeID) + continue + } + var rlocalities replicasLocalityTiers + if isVoter { + rlocalities = rstate.constraints.voterLocalityTiers + } else { + rlocalities = rstate.constraints.replicaLocalityTiers + } + localities := rs.dsm.getExistingReplicaLocalities(rlocalities) + isLeaseholder := rstate.constraints.leaseholderID == store.StoreID + // Set the diversity score and lease preference index of the candidates. + for _, cand := range cands.candidates { + cand.diversityScore = localities.getScoreChangeForRebalance( + ss.localityTiers, rs.cs.stores[cand.StoreID].localityTiers) + if isLeaseholder { + cand.leasePreferenceIndex = matchedLeasePreferenceIndex( + cand.StoreID, rstate.constraints.spanConfig.leasePreferences, rs.cs.constraintMatcher) + } + } + // Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and + // s3, s4 are loadNormal. Now s4 is considering rebalancing load away + // from s1, but the candidate top-k range has replicas {s1, s3, s4}. So + // the only way to shed load from s1 is a s1 => s2 move. But there may + // be other ranges at other leaseholder stores which can be moved from + // s1 => {s3, s4}. So we should not be doing this sub-optimal transfer + // of load from s1 => s2 unless s1 is not seeing any load shedding for + // some interval of time. We need a way to capture this information in a + // simple but effective manner. For now, we capture this using these + // grace duration thresholds. + ignoreLevel := ignoreLoadNoChangeAndHigher + overloadDur := now.Sub(ss.overloadStartTime) + if overloadDur > ignoreHigherThanLoadThresholdGraceDuration { + ignoreLevel = ignoreHigherThanLoadThreshold + log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", + ignoreLevel, ssSLS.sls, rangeID, overloadDur) + } else if overloadDur > ignoreLoadThresholdAndHigherGraceDuration { + ignoreLevel = ignoreLoadThresholdAndHigher + log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v", + ignoreLevel, ssSLS.sls, rangeID, overloadDur) + } + targetStoreID := sortTargetCandidateSetAndPick( + ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rs.rng) + if targetStoreID == 0 { + log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+ + "(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel) + continue + } + targetSS := rs.cs.stores[targetStoreID] + addedLoad := rstate.load.Load + if !isLeaseholder { + addedLoad[CPURate] = rstate.load.RaftCPU + } + if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) { + log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v", + store.StoreID, targetStoreID, rangeID, addedLoad) + continue + } + addTarget := roachpb.ReplicationTarget{ + NodeID: targetSS.NodeID, + StoreID: targetSS.StoreID, + } + removeTarget := roachpb.ReplicationTarget{ + NodeID: ss.NodeID, + StoreID: ss.StoreID, + } + if addTarget.StoreID == removeTarget.StoreID { + panic(fmt.Sprintf("internal state inconsistency: "+ + "add=%v==remove_target=%v range_id=%v candidates=%v", + addTarget, removeTarget, rangeID, cands.candidates)) + } + replicaChanges := makeRebalanceReplicaChanges( + rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) + rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err = rs.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { + panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v", + replicaChanges, rangeID)) + } + rs.cs.addPendingRangeChange(rangeChange) + rs.changes = append(rs.changes, rangeChange) + rs.rangeMoveCount++ + log.KvDistribution.VInfof(ctx, 2, + "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", + rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load) + if rs.rangeMoveCount >= rs.maxRangeMoveCount { + log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, rs.maxRangeMoveCount) + rs.shouldReturnEarly = true + return + } + doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold + if doneShedding { + log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk", + store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1)) + break + } + } + // TODO(sumeer): For regular rebalancing, we will wait until those top-K + // move and then continue with the rest. There is a risk that the top-K + // have some constraint that prevents rebalancing, while the rest can be + // moved. Running with underprovisioned clusters and expecting load-based + // rebalancing to work well is not in scope. + if doneShedding { + log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) + rs.shouldContinue = true + return } - return rs.changes } From b5ce55183dd43229e403825dd70fb9e7a0f1cb46 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:56:13 +0100 Subject: [PATCH 10/11] mmaprototype: remove shouldReturnEarly flag from rebalanceState Remove the shouldReturnEarly flag and instead check the condition directly in the loop. The loop now checks if rangeMoveCount or leaseTransferCount has reached their limits before processing each store, and breaks if either condition is met. --- .../mmaprototype/cluster_state_rebalance_stores.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index 3fdc1a409a74..3cf62eca8a6b 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -36,8 +36,6 @@ type rebalanceState struct { rangeMoveCount int // leaseTransferCount tracks the number of lease transfers made. leaseTransferCount int - // shouldReturnEarly indicates the outer loop should return immediately. - shouldReturnEarly bool // shouldContinue indicates the outer loop should continue to the next iteration. shouldContinue bool // maxRangeMoveCount is the maximum number of range moves allowed. @@ -179,7 +177,6 @@ func (cs *clusterState) rebalanceStores( changes: []PendingRangeChange{}, rangeMoveCount: 0, leaseTransferCount: 0, - shouldReturnEarly: false, shouldContinue: false, maxRangeMoveCount: maxRangeMoveCount, maxLeaseTransferCount: maxLeaseTransferCount, @@ -188,10 +185,10 @@ func (cs *clusterState) rebalanceStores( rs.scratch.nodes = map[roachpb.NodeID]*NodeLoad{} rs.scratch.stores = map[roachpb.StoreID]struct{}{} for _, store := range sheddingStores { - rs.rebalanceStore(store, ctx, localStoreID, now) - if rs.shouldReturnEarly { - return rs.changes + if rs.rangeMoveCount >= rs.maxRangeMoveCount || rs.leaseTransferCount >= rs.maxLeaseTransferCount { + break } + rs.rebalanceStore(store, ctx, localStoreID, now) if rs.shouldContinue { rs.shouldContinue = false continue @@ -396,7 +393,6 @@ func (rs *rebalanceState) rebalanceStore( targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) if rs.leaseTransferCount >= rs.maxLeaseTransferCount { log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount) - rs.shouldReturnEarly = true return } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold @@ -608,7 +604,6 @@ func (rs *rebalanceState) rebalanceStore( rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load) if rs.rangeMoveCount >= rs.maxRangeMoveCount { log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, rs.maxRangeMoveCount) - rs.shouldReturnEarly = true return } doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold From db7c2de6c9ce0859b38bbc4dc9a2db0875a1b66c Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 12 Nov 2025 17:57:49 +0100 Subject: [PATCH 11/11] mmaprototype: remove shouldContinue flag from rebalanceState Remove the shouldContinue flag since the function returns normally when it wants to skip to the next store, and the loop naturally continues to the next iteration. --- .../mmaprototype/cluster_state_rebalance_stores.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index 3cf62eca8a6b..6b3f89a4180b 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -36,8 +36,6 @@ type rebalanceState struct { rangeMoveCount int // leaseTransferCount tracks the number of lease transfers made. leaseTransferCount int - // shouldContinue indicates the outer loop should continue to the next iteration. - shouldContinue bool // maxRangeMoveCount is the maximum number of range moves allowed. maxRangeMoveCount int // maxLeaseTransferCount is the maximum number of lease transfers allowed. @@ -177,7 +175,6 @@ func (cs *clusterState) rebalanceStores( changes: []PendingRangeChange{}, rangeMoveCount: 0, leaseTransferCount: 0, - shouldContinue: false, maxRangeMoveCount: maxRangeMoveCount, maxLeaseTransferCount: maxLeaseTransferCount, lastFailedChangeDelayDuration: lastFailedChangeDelayDuration, @@ -189,10 +186,6 @@ func (cs *clusterState) rebalanceStores( break } rs.rebalanceStore(store, ctx, localStoreID, now) - if rs.shouldContinue { - rs.shouldContinue = false - continue - } } return rs.changes } @@ -412,7 +405,6 @@ func (rs *rebalanceState) rebalanceStore( // lease transfers -- so be it. log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d", store.StoreID, doneShedding, rs.leaseTransferCount) - rs.shouldContinue = true return } } else { @@ -425,7 +417,6 @@ func (rs *rebalanceState) rebalanceStore( if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow && now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration { log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID) - rs.shouldContinue = true return } // If the node is cpu overloaded, or the store/node is not fdOK, exclude @@ -620,7 +611,6 @@ func (rs *rebalanceState) rebalanceStore( // rebalancing to work well is not in scope. if doneShedding { log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID) - rs.shouldContinue = true return } }