Skip to content

Commit

Permalink
server/balancer: guarantee replicas are safe if possible (#473)
Browse files Browse the repository at this point in the history
* server: guarantee replicas are safe when possible

* check location labels before putting store
  • Loading branch information
huachaohuang authored Jan 21, 2017
1 parent 550ba5e commit aca3a4a
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 164 deletions.
2 changes: 1 addition & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ max-replicas = 3
# The placement priorities is implied by the order of label keys.
# For example, ["zone", "rack"] means that we should place replicas to
# different zones first, then to different racks if we don't have enough zones.
location-labels = []
location-labels = []
50 changes: 26 additions & 24 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type leaderBalancer struct {
func newLeaderBalancer(opt *scheduleOption) *leaderBalancer {
var filters []Filter
filters = append(filters, newStateFilter(opt))
filters = append(filters, newHealthFilter(opt))
filters = append(filters, newLeaderCountFilter(opt))

return &leaderBalancer{
Expand Down Expand Up @@ -74,6 +75,7 @@ func newStorageBalancer(opt *scheduleOption) *storageBalancer {
var filters []Filter
filters = append(filters, newCacheFilter(cache))
filters = append(filters, newStateFilter(opt))
filters = append(filters, newHealthFilter(opt))
filters = append(filters, newRegionCountFilter(opt))
filters = append(filters, newSnapshotCountFilter(opt))

Expand Down Expand Up @@ -115,14 +117,13 @@ func (s *storageBalancer) Schedule(cluster *clusterInfo) Operator {
}

func (s *storageBalancer) transferPeer(cluster *clusterInfo, region *regionInfo, oldPeer *metapb.Peer) Operator {
// scoreGuard guarantees that the distinct score will not decrease.
stores := cluster.getRegionStores(region)
source := cluster.getStore(oldPeer.GetStoreId())
scoreGuard := newDistinctScoreFilter(s.rep, stores, source)

// Allocate a new peer from the store with smallest storage ratio.
// We need to ensure the target store will not break the replication constraints.
excluded := newExcludedFilter(nil, region.GetStoreIds())
replication := newReplicationFilter(s.rep, stores, source)
newPeer := scheduleAddPeer(cluster, s.selector, excluded, replication)
checker := newReplicaChecker(s.opt, cluster)
newPeer, _ := checker.selectBestPeer(region, scoreGuard)
if newPeer == nil {
return nil
}
Expand All @@ -145,7 +146,7 @@ type replicaChecker struct {

func newReplicaChecker(opt *scheduleOption, cluster *clusterInfo) *replicaChecker {
var filters []Filter
filters = append(filters, newStateFilter(opt))
filters = append(filters, newHealthFilter(opt))
filters = append(filters, newSnapshotCountFilter(opt))
filters = append(filters, newStorageThresholdFilter(opt))

Expand Down Expand Up @@ -181,33 +182,35 @@ func (r *replicaChecker) Check(region *regionInfo) Operator {
return newRemovePeer(region, oldPeer)
}

return r.checkBetterPeer(region)
return r.checkBestReplacement(region)
}

// selectBestPeer returns the best peer in other stores.
func (r *replicaChecker) selectBestPeer(region *regionInfo, filters ...Filter) (*metapb.Peer, float64) {
filters = append(filters, r.filters...)
// Add some must have filters.
filters = append(filters, newStateFilter(r.opt))
filters = append(filters, newExcludedFilter(nil, region.GetStoreIds()))

var (
bestStore *storeInfo
bestScore float64
)

// Find the store with best score.
regionStores := r.cluster.getRegionStores(region)
// Select the store with best distinct score.
// If the scores are the same, select the store with minimal storage ratio.
stores := r.cluster.getRegionStores(region)
for _, store := range r.cluster.getStores() {
if filterTarget(store, filters) {
continue
}
score := r.rep.GetReplicaScore(regionStores, store)
score := r.rep.GetDistinctScore(stores, store)
if bestStore == nil || compareStoreScore(store, score, bestStore, bestScore) > 0 {
bestStore = store
bestScore = score
}
}

if bestStore == nil {
if bestStore == nil || filterTarget(bestStore, r.filters) {
return nil, 0
}

Expand All @@ -221,27 +224,26 @@ func (r *replicaChecker) selectBestPeer(region *regionInfo, filters ...Filter) (

// selectWorstPeer returns the worst peer in the region.
func (r *replicaChecker) selectWorstPeer(region *regionInfo, filters ...Filter) (*metapb.Peer, float64) {
filters = append(filters, r.filters...)

var (
worstStore *storeInfo
worstScore float64
)

// Find the store with worst score.
regionStores := r.cluster.getRegionStores(region)
for _, store := range regionStores {
// Select the store with lowest distinct score.
// If the scores are the same, select the store with maximal storage ratio.
stores := r.cluster.getRegionStores(region)
for _, store := range stores {
if filterSource(store, filters) {
continue
}
score := r.rep.GetReplicaScore(regionStores, store)
score := r.rep.GetDistinctScore(stores, store)
if worstStore == nil || compareStoreScore(store, score, worstStore, worstScore) < 0 {
worstStore = store
worstScore = score
}
}

if worstStore == nil {
if worstStore == nil || filterSource(worstStore, r.filters) {
return nil, 0
}
return region.GetStorePeer(worstStore.GetId()), worstScore
Expand All @@ -252,7 +254,6 @@ func (r *replicaChecker) selectBestReplacement(region *regionInfo, peer *metapb.
// Get a new region without the peer we are going to replace.
newRegion := region.clone()
newRegion.RemoveStorePeer(peer.GetStoreId())
// Get the best peer in other stores.
return r.selectBestPeer(newRegion, newExcludedFilter(nil, region.GetStoreIds()))
}

Expand Down Expand Up @@ -280,16 +281,17 @@ func (r *replicaChecker) checkOfflinePeer(region *regionInfo) Operator {
if store.isUp() {
continue
}
newPeer, _ := r.selectBestReplacement(region, peer)
newPeer, _ := r.selectBestPeer(region)
if newPeer == nil {
return nil
}
return newTransferPeer(region, peer, newPeer)
}

return nil
}

func (r *replicaChecker) checkBetterPeer(region *regionInfo) Operator {
func (r *replicaChecker) checkBestReplacement(region *regionInfo) Operator {
oldPeer, oldScore := r.selectWorstPeer(region)
if oldPeer == nil {
return nil
Expand All @@ -298,8 +300,8 @@ func (r *replicaChecker) checkBetterPeer(region *regionInfo) Operator {
if newPeer == nil {
return nil
}
// We can't find a better peer (the lower the better).
if newScore >= oldScore {
// Make sure the new peer is better than the old peer.
if newScore <= oldScore {
return nil
}
return newTransferPeer(region, oldPeer, newPeer)
Expand Down
Loading

0 comments on commit aca3a4a

Please sign in to comment.