Skip to content

Commit

Permalink
server/replication: remove allow-unsafe-balance option
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Jan 13, 2017
1 parent f25fd88 commit 443d700
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 112 deletions.
7 changes: 1 addition & 6 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,4 @@ max-replicas = 3
# The placement priorities is implied by the order of label keys.
# For example, ["zone", "rack"] means that we should place replicas to
# different zones first, then to different racks if we don't have enough zones.
location-labels = []
# Allow to balance even if it is unsafe. The balance algorithm tries its best to
# keep safe, but if resources are limited (not enough zones or racks), or under
# some abnormal conditions (stores are down or full), the current implementation
# can't balance and guarantee replicas are safe at the same time.
#allow-unsafe-balance = false
location-labels = []
42 changes: 27 additions & 15 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,28 @@ type storageBalancer struct {
opt *scheduleOption
rep *Replication
cache *idCache
filters []Filter
selector Selector
}

func newStorageBalancer(opt *scheduleOption) *storageBalancer {
cache := newIDCache(storeCacheInterval, 4*storeCacheInterval)

var filters []Filter
filters = append(filters, newCacheFilter(cache))
filters = append(filters, newStateFilter(opt))
filters = append(filters, newHealthFilter(opt))
filters = append(filters, newRegionCountFilter(opt))
filters = append(filters, newSnapshotCountFilter(opt))
var hardFilters []Filter
hardFilters = append(hardFilters, newCacheFilter(cache))
hardFilters = append(hardFilters, newStateFilter(opt))
hardFilters = append(hardFilters, newRegionCountFilter(opt))

var softFilters []Filter
softFilters = append(softFilters, newHealthFilter(opt))
softFilters = append(softFilters, newSnapshotCountFilter(opt))

return &storageBalancer{
opt: opt,
rep: opt.GetReplication(),
cache: cache,
selector: newBalanceSelector(storageKind, filters),
filters: softFilters,
selector: newBalanceSelector(storageKind, hardFilters),
}
}

Expand Down Expand Up @@ -130,14 +134,22 @@ func (s *storageBalancer) transferPeer(cluster *clusterInfo, region *regionInfo,
// with the new peer.
distinctGuard := newDistinctScoreFilter(s.rep, stores, source)

// See if we can select a safe one first.
newPeer := scheduleAddPeer(cluster, s.selector, excluded, safeGuard, distinctGuard)
if newPeer == nil && s.rep.AllowUnsafeBalance() {
// If we can't, we find an unsafe one if it is allowed.
newPeer = scheduleAddPeer(cluster, s.selector, excluded, distinctGuard)
}
if newPeer == nil {
return nil
// See if we can select a safe new peer first.
filters := []Filter{excluded, distinctGuard, safeGuard}
newPeer := scheduleAddPeer(cluster, s.selector, filters...)
if newPeer != nil {
// If the safe new peer is not available for now, we wait.
newStore := cluster.getStore(newPeer.GetStoreId())
if filterTarget(newStore, s.filters) {
return nil
}
} else {
// If we can't find a safe new peer, we just find the smallest.
filters = append([]Filter{excluded, distinctGuard}, s.filters...)
newPeer = scheduleAddPeer(cluster, s.selector, filters...)
if newPeer == nil {
return nil
}
}

target := cluster.getStore(newPeer.GetStoreId())
Expand Down
120 changes: 50 additions & 70 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ func (s *testStorageBalancerSuite) TestBalance(c *C) {
c.Assert(sb.Schedule(cluster), IsNil)
}

func (s *testStorageBalancerSuite) TestSafe(c *C) {
func (s *testStorageBalancerSuite) TestReplicas3(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(3, false, "zone", "rack", "host")
opt.rep = newTestReplication(3, "zone", "rack", "host")

sb := newStorageBalancer(opt)

Expand All @@ -241,28 +241,61 @@ func (s *testStorageBalancerSuite) TestSafe(c *C) {
tc.addLabelsStore(3, 1, 0.3, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"})

tc.addLeaderRegion(1, 1, 2, 3)
// This schedule try to replace peer in store 1, but we have no other stores,
// so store 1 will be set in the cache and skipped next schedule.
c.Assert(sb.Schedule(cluster), IsNil)
c.Assert(sb.cache.get(1), IsTrue)

// Store 4 has smaller storage ratio than store 2.
tc.addLabelsStore(4, 1, 0.1, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"})
checkTransferPeer(c, sb.Schedule(cluster), 2, 4)

// Store 5 has smaller storage ratio than store 1.
tc.addLabelsStore(5, 1, 0.2, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
sb.cache.delete(1) // Delete store 1 from cache, or it will be skipped.
checkTransferPeer(c, sb.Schedule(cluster), 1, 5)

// Store 6 has smaller storage ratio than store 5.
tc.addLabelsStore(6, 1, 0.1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
checkTransferPeer(c, sb.Schedule(cluster), 1, 6)

// Store 4 has smaller storage ratio than store 1, but it is unsafe.
tc.addLabelsStore(4, 1, 0.1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
// Store 7 has the same storage ratio with store 6, but in a different host.
tc.addLabelsStore(7, 1, 0.2, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"})
checkTransferPeer(c, sb.Schedule(cluster), 1, 7)

// If store 7 is not available, we wait.
tc.setStoreDown(7)
c.Assert(sb.Schedule(cluster), IsNil)
c.Assert(sb.cache.get(1), IsTrue)
tc.setStoreUp(7)
checkTransferPeer(c, sb.Schedule(cluster), 2, 7)
sb.cache.delete(1)
checkTransferPeer(c, sb.Schedule(cluster), 1, 7)

// Store 5 has smaller storage ratio than store 1, and it is safe.
tc.addLabelsStore(5, 1, 0.2, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"})
checkTransferPeer(c, sb.Schedule(cluster), 1, 5)
// Store 8 has smaller storage ratio than store 7, but the distinct score decrease.
tc.addLabelsStore(8, 1, 0.1, map[string]string{"zone": "z1", "rack": "r2", "host": "h3"})
checkTransferPeer(c, sb.Schedule(cluster), 1, 7)

// Store 6 has smaller storage ratio than store 5, but the distinct score decrease.
tc.addLabelsStore(6, 1, 0.1, map[string]string{"zone": "z1", "rack": "r2", "host": "h3"})
checkTransferPeer(c, sb.Schedule(cluster), 1, 5)
// Take down 4,5,6,7
tc.setStoreDown(4)
tc.setStoreDown(5)
tc.setStoreDown(6)
tc.setStoreDown(7)
c.Assert(sb.Schedule(cluster), IsNil)
c.Assert(sb.cache.get(1), IsTrue)
sb.cache.delete(1)

// Store 7 has different zone with other stores but larger storage ratio than store 1.
tc.addLabelsStore(9, 1, 0.6, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
c.Assert(sb.Schedule(cluster), IsNil)
}

func (s *testStorageBalancerSuite) TestSafe2(c *C) {
func (s *testStorageBalancerSuite) TestReplicas5(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(5, false, "zone", "rack", "host")
opt.rep = newTestReplication(5, "zone", "rack", "host")

sb := newStorageBalancer(opt)

Expand All @@ -274,11 +307,9 @@ func (s *testStorageBalancerSuite) TestSafe2(c *C) {

tc.addLeaderRegion(1, 1, 2, 3, 4, 5)

// Store 6 has smaller ratio, but it is unsafe.
// Store 6 has smaller ratio.
tc.addLabelsStore(6, 1, 0.3, map[string]string{"zone": "z5", "rack": "r2", "host": "h1"})
c.Assert(sb.Schedule(cluster), IsNil)
c.Assert(sb.cache.get(5), IsTrue)
sb.cache.delete(5)
checkTransferPeer(c, sb.Schedule(cluster), 5, 6)

// Store 7 has smaller ratio, and it is safe.
tc.addLabelsStore(7, 1, 0.4, map[string]string{"zone": "z6", "rack": "r1", "host": "h1"})
Expand All @@ -296,57 +327,6 @@ func (s *testStorageBalancerSuite) TestSafe2(c *C) {
checkTransferPeer(c, sb.Schedule(cluster), 11, 1)
}

func (s *testStorageBalancerSuite) TestUnsafe(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(3, true, "zone", "rack", "host")

sb := newStorageBalancer(opt)

// Store 1 has the largest storage ratio, so the balancer try to replace peer in store 1.
tc.addLabelsStore(1, 1, 0.5, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.addLabelsStore(2, 1, 0.4, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"})
tc.addLabelsStore(3, 1, 0.3, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"})

tc.addLeaderRegion(1, 1, 2, 3)
// This schedule try to replace peer in store 1, but we have no other stores,
// so store 1 will be set in the cache and skipped next schedule.
c.Assert(sb.Schedule(cluster), IsNil)
c.Assert(sb.cache.get(1), IsTrue)

// Store 4 has smaller storage ratio than store 2.
tc.addLabelsStore(4, 1, 0.1, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"})
checkTransferPeer(c, sb.Schedule(cluster), 2, 4)

// Store 5 has smaller storage ratio than store 1.
tc.addLabelsStore(5, 1, 0.2, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
sb.cache.delete(1) // Delete store 1 from cache, or it will be skipped.
checkTransferPeer(c, sb.Schedule(cluster), 1, 5)

// Store 6 has smaller storage ratio than store 5.
tc.addLabelsStore(6, 1, 0.1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
checkTransferPeer(c, sb.Schedule(cluster), 1, 6)

// Store 7 has the same storage ratio with store 6, but in a different host.
tc.addLabelsStore(7, 1, 0.1, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"})
checkTransferPeer(c, sb.Schedule(cluster), 1, 7)

// Take down 4,5,6,7
tc.setStoreDown(4)
tc.setStoreDown(5)
tc.setStoreDown(6)
tc.setStoreDown(7)
c.Assert(sb.Schedule(cluster), IsNil)
c.Assert(sb.cache.get(1), IsTrue)
sb.cache.delete(1)

// Store 7 has different zone with other stores but larger storage ratio than store 1.
tc.addLabelsStore(8, 1, 0.6, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
c.Assert(sb.Schedule(cluster), IsNil)
}

var _ = Suite(&testReplicaCheckerSuite{})

type testReplicaCheckerSuite struct{}
Expand Down Expand Up @@ -426,7 +406,7 @@ func (s *testReplicaCheckerSuite) TestOffline(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(3, false, "zone", "rack", "host")
opt.rep = newTestReplication(3, "zone", "rack", "host")

rc := newReplicaChecker(opt, cluster)

Expand Down Expand Up @@ -475,7 +455,7 @@ func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(3, false, "zone", "rack", "host")
opt.rep = newTestReplication(3, "zone", "rack", "host")

rc := newReplicaChecker(opt, cluster)

Expand Down Expand Up @@ -554,7 +534,7 @@ func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(5, false, "zone", "host")
opt.rep = newTestReplication(5, "zone", "host")

rc := newReplicaChecker(opt, cluster)

Expand Down
7 changes: 0 additions & 7 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,6 @@ type ReplicationConfig struct {
// For example, ["zone", "rack"] means that we should place replicas to
// different zones first, then to different racks if we don't have enough zones.
LocationLabels []string `toml:"location-labels" json:"location-labels"`

// Allow to balance even if it is unsafe. The balance algorithm tries its
// best to keep safe, but if resources are limited (not enough zones or
// racks), or under some abnormal conditions (stores are down or full), the
// current implementation can't balance and guarantee replicas are safe at
// the same time.
AllowUnsafeBalance bool `toml:"allow-unsafe-balance" json:"allow-unsafe-balance"`
}

func (c *ReplicationConfig) adjust() {
Expand Down
3 changes: 2 additions & 1 deletion server/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (f *replicaScoreFilter) FilterSource(store *storeInfo) bool {
}

func (f *replicaScoreFilter) FilterTarget(store *storeInfo) bool {
return f.rep.GetReplicaScore(append(f.stores, store)) < f.safeScore
newStores := append([]*storeInfo{store}, f.stores...)
return f.rep.GetReplicaScore(newStores) < f.safeScore
}

// distinctScoreFilter ensures that distinct score will not decrease.
Expand Down
5 changes: 0 additions & 5 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (r *Replication) GetMaxReplicas() int {
return int(r.cfg.MaxReplicas)
}

// AllowUnsafeBalance returns true if it is allowed to do unsafe balance.
func (r *Replication) AllowUnsafeBalance() bool {
return r.cfg.AllowUnsafeBalance
}

// GetReplicaScore returns the failure tolerance score of these replicas.
// A higher score means these replicas can tolerant more kind of failures.
func (r *Replication) GetReplicaScore(replicas []*storeInfo) float64 {
Expand Down
11 changes: 5 additions & 6 deletions server/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ package server

import . "github.com/pingcap/check"

func newTestReplication(maxReplicas int, unsafeBalance bool, locationLabels ...string) *Replication {
func newTestReplication(maxReplicas int, locationLabels ...string) *Replication {
cfg := &ReplicationConfig{
MaxReplicas: uint64(maxReplicas),
LocationLabels: locationLabels,
AllowUnsafeBalance: unsafeBalance,
MaxReplicas: uint64(maxReplicas),
LocationLabels: locationLabels,
}
return newReplication(cfg)
}
Expand All @@ -31,7 +30,7 @@ type testReplicationSuite struct{}
func (s *testReplicationSuite) TestReplicaScore(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)
rep := newTestReplication(3, false, "zone", "rack", "host")
rep := newTestReplication(3, "zone", "rack", "host")

zones := []string{"z1", "z2", "z3"}
racks := []string{"r1", "r2", "r3"}
Expand Down Expand Up @@ -70,7 +69,7 @@ func (s *testReplicationSuite) TestReplicaScore(c *C) {
func (s *testReplicationSuite) TestDistinctScore(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)
rep := newTestReplication(3, false, "zone", "rack", "host")
rep := newTestReplication(3, "zone", "rack", "host")

zones := []string{"z1", "z2", "z3"}
racks := []string{"r1", "r2", "r3"}
Expand Down
4 changes: 2 additions & 2 deletions server/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newBalanceSelector(kind ResourceKind, filters []Filter) *balanceSelector {
}

func (s *balanceSelector) SelectSource(stores []*storeInfo, filters ...Filter) *storeInfo {
filters = append(s.filters, filters...)
filters = append(filters, s.filters...)

var result *storeInfo
for _, store := range stores {
Expand All @@ -49,7 +49,7 @@ func (s *balanceSelector) SelectSource(stores []*storeInfo, filters ...Filter) *
}

func (s *balanceSelector) SelectTarget(stores []*storeInfo, filters ...Filter) *storeInfo {
filters = append(s.filters, filters...)
filters = append(filters, s.filters...)

var result *storeInfo
for _, store := range stores {
Expand Down

0 comments on commit 443d700

Please sign in to comment.