Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/balancer: guarantee replicas are safe if possible #473

Merged
merged 6 commits into from
Jan 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ max-replicas = 3
# The placement priorities is implied by the order of label keys.
# For example, ["zone", "rack"] means that we should place replicas to
# different zones first, then to different racks if we don't have enough zones.
location-labels = []
location-labels = []
50 changes: 26 additions & 24 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type leaderBalancer struct {
func newLeaderBalancer(opt *scheduleOption) *leaderBalancer {
var filters []Filter
filters = append(filters, newStateFilter(opt))
filters = append(filters, newHealthFilter(opt))
filters = append(filters, newLeaderCountFilter(opt))

return &leaderBalancer{
Expand Down Expand Up @@ -74,6 +75,7 @@ func newStorageBalancer(opt *scheduleOption) *storageBalancer {
var filters []Filter
filters = append(filters, newCacheFilter(cache))
filters = append(filters, newStateFilter(opt))
filters = append(filters, newHealthFilter(opt))
filters = append(filters, newRegionCountFilter(opt))
filters = append(filters, newSnapshotCountFilter(opt))

Expand Down Expand Up @@ -115,14 +117,13 @@ func (s *storageBalancer) Schedule(cluster *clusterInfo) Operator {
}

func (s *storageBalancer) transferPeer(cluster *clusterInfo, region *regionInfo, oldPeer *metapb.Peer) Operator {
// scoreGuard guarantees that the distinct score will not decrease.
stores := cluster.getRegionStores(region)
source := cluster.getStore(oldPeer.GetStoreId())
scoreGuard := newDistinctScoreFilter(s.rep, stores, source)

// Allocate a new peer from the store with smallest storage ratio.
// We need to ensure the target store will not break the replication constraints.
excluded := newExcludedFilter(nil, region.GetStoreIds())
replication := newReplicationFilter(s.rep, stores, source)
newPeer := scheduleAddPeer(cluster, s.selector, excluded, replication)
checker := newReplicaChecker(s.opt, cluster)
newPeer, _ := checker.selectBestPeer(region, scoreGuard)
if newPeer == nil {
return nil
}
Expand All @@ -145,7 +146,7 @@ type replicaChecker struct {

func newReplicaChecker(opt *scheduleOption, cluster *clusterInfo) *replicaChecker {
var filters []Filter
filters = append(filters, newStateFilter(opt))
filters = append(filters, newHealthFilter(opt))
filters = append(filters, newSnapshotCountFilter(opt))
filters = append(filters, newStorageThresholdFilter(opt))

Expand Down Expand Up @@ -181,33 +182,35 @@ func (r *replicaChecker) Check(region *regionInfo) Operator {
return newRemovePeer(region, oldPeer)
}

return r.checkBetterPeer(region)
return r.checkBestReplacement(region)
}

// selectBestPeer returns the best peer in other stores.
func (r *replicaChecker) selectBestPeer(region *regionInfo, filters ...Filter) (*metapb.Peer, float64) {
filters = append(filters, r.filters...)
// Add some must have filters.
filters = append(filters, newStateFilter(r.opt))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a healthFilter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

healthFilter will be used after the best store has been found, or the best store can be filtered if it is temporarily unavailable.

filters = append(filters, newExcludedFilter(nil, region.GetStoreIds()))

var (
bestStore *storeInfo
bestScore float64
)

// Find the store with best score.
regionStores := r.cluster.getRegionStores(region)
// Select the store with best distinct score.
// If the scores are the same, select the store with minimal storage ratio.
stores := r.cluster.getRegionStores(region)
for _, store := range r.cluster.getStores() {
if filterTarget(store, filters) {
continue
}
score := r.rep.GetReplicaScore(regionStores, store)
score := r.rep.GetDistinctScore(stores, store)
if bestStore == nil || compareStoreScore(store, score, bestStore, bestScore) > 0 {
bestStore = store
bestScore = score
}
}

if bestStore == nil {
if bestStore == nil || filterTarget(bestStore, r.filters) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not check r.filters along with filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.filters are used to filter temporarily unavailable stores. We may choose an unsafe store if we filter those temporarily unavailable stores before calculating the score.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

return nil, 0
}

Expand All @@ -221,27 +224,26 @@ func (r *replicaChecker) selectBestPeer(region *regionInfo, filters ...Filter) (

// selectWorstPeer returns the worst peer in the region.
func (r *replicaChecker) selectWorstPeer(region *regionInfo, filters ...Filter) (*metapb.Peer, float64) {
filters = append(filters, r.filters...)

var (
worstStore *storeInfo
worstScore float64
)

// Find the store with worst score.
regionStores := r.cluster.getRegionStores(region)
for _, store := range regionStores {
// Select the store with lowest distinct score.
// If the scores are the same, select the store with maximal storage ratio.
stores := r.cluster.getRegionStores(region)
for _, store := range stores {
if filterSource(store, filters) {
continue
}
score := r.rep.GetReplicaScore(regionStores, store)
score := r.rep.GetDistinctScore(stores, store)
if worstStore == nil || compareStoreScore(store, score, worstStore, worstScore) < 0 {
worstStore = store
worstScore = score
}
}

if worstStore == nil {
if worstStore == nil || filterSource(worstStore, r.filters) {
return nil, 0
}
return region.GetStorePeer(worstStore.GetId()), worstScore
Expand All @@ -252,7 +254,6 @@ func (r *replicaChecker) selectBestReplacement(region *regionInfo, peer *metapb.
// Get a new region without the peer we are going to replace.
newRegion := region.clone()
newRegion.RemoveStorePeer(peer.GetStoreId())
// Get the best peer in other stores.
return r.selectBestPeer(newRegion, newExcludedFilter(nil, region.GetStoreIds()))
}

Expand Down Expand Up @@ -280,16 +281,17 @@ func (r *replicaChecker) checkOfflinePeer(region *regionInfo) Operator {
if store.isUp() {
continue
}
newPeer, _ := r.selectBestReplacement(region, peer)
newPeer, _ := r.selectBestPeer(region)
if newPeer == nil {
return nil
}
return newTransferPeer(region, peer, newPeer)
}

return nil
}

func (r *replicaChecker) checkBetterPeer(region *regionInfo) Operator {
func (r *replicaChecker) checkBestReplacement(region *regionInfo) Operator {
oldPeer, oldScore := r.selectWorstPeer(region)
if oldPeer == nil {
return nil
Expand All @@ -298,8 +300,8 @@ func (r *replicaChecker) checkBetterPeer(region *regionInfo) Operator {
if newPeer == nil {
return nil
}
// We can't find a better peer (the lower the better).
if newScore >= oldScore {
// Make sure the new peer is better than the old peer.
if newScore <= oldScore {
return nil
}
return newTransferPeer(region, oldPeer, newPeer)
Expand Down
Loading