diff --git a/server/cache.go b/server/cache.go index 884666f5504..b3233381e01 100644 --- a/server/cache.go +++ b/server/cache.go @@ -29,7 +29,7 @@ import ( type clusterInfo struct { sync.RWMutex - *schedule.BasicCluster + core *schedule.BasicCluster id core.IDAllocator kv *core.KV @@ -42,7 +42,7 @@ type clusterInfo struct { func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo { return &clusterInfo{ - BasicCluster: schedule.NewBasicCluster(), + core: schedule.NewBasicCluster(), id: id, opt: opt, kv: kv, @@ -64,16 +64,16 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl } start := time.Now() - if err := kv.LoadStores(c.Stores); err != nil { + if err := kv.LoadStores(c.core.Stores); err != nil { return nil, errors.Trace(err) } - log.Infof("load %v stores cost %v", c.Stores.GetStoreCount(), time.Since(start)) + log.Infof("load %v stores cost %v", c.core.Stores.GetStoreCount(), time.Since(start)) start = time.Now() - if err := kv.LoadRegions(c.Regions); err != nil { + if err := kv.LoadRegions(c.core.Regions); err != nil { return nil, errors.Trace(err) } - log.Infof("load %v regions cost %v", c.Regions.GetRegionCount(), time.Since(start)) + log.Infof("load %v regions cost %v", c.core.Regions.GetRegionCount(), time.Since(start)) return c, nil } @@ -128,7 +128,7 @@ func (c *clusterInfo) putMetaLocked(meta *metapb.Cluster) error { func (c *clusterInfo) GetStore(storeID uint64) *core.StoreInfo { c.RLock() defer c.RUnlock() - return c.BasicCluster.GetStore(storeID) + return c.core.GetStore(storeID) } func (c *clusterInfo) putStore(store *core.StoreInfo) error { @@ -143,97 +143,97 @@ func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error { return errors.Trace(err) } } - return c.BasicCluster.PutStore(store) + return c.core.PutStore(store) } // BlockStore stops balancer from selecting the store. func (c *clusterInfo) BlockStore(storeID uint64) error { c.Lock() defer c.Unlock() - return c.BasicCluster.BlockStore(storeID) + return c.core.BlockStore(storeID) } // UnblockStore allows balancer to select the store. func (c *clusterInfo) UnblockStore(storeID uint64) { c.Lock() defer c.Unlock() - c.BasicCluster.UnblockStore(storeID) + c.core.UnblockStore(storeID) } // GetStores returns all stores in the cluster. func (c *clusterInfo) GetStores() []*core.StoreInfo { c.RLock() defer c.RUnlock() - return c.BasicCluster.GetStores() + return c.core.GetStores() } func (c *clusterInfo) getMetaStores() []*metapb.Store { c.RLock() defer c.RUnlock() - return c.Stores.GetMetaStores() + return c.core.Stores.GetMetaStores() } func (c *clusterInfo) getStoreCount() int { c.RLock() defer c.RUnlock() - return c.Stores.GetStoreCount() + return c.core.Stores.GetStoreCount() } func (c *clusterInfo) getStoresWriteStat() map[uint64]uint64 { c.RLock() defer c.RUnlock() - return c.Stores.GetStoresWriteStat() + return c.core.Stores.GetStoresWriteStat() } func (c *clusterInfo) getStoresReadStat() map[uint64]uint64 { c.RLock() defer c.RUnlock() - return c.Stores.GetStoresReadStat() + return c.core.Stores.GetStoresReadStat() } // ScanRegions scans region with start key, until number greater than limit. func (c *clusterInfo) ScanRegions(startKey []byte, limit int) []*core.RegionInfo { c.RLock() defer c.RUnlock() - return c.Regions.ScanRange(startKey, limit) + return c.core.Regions.ScanRange(startKey, limit) } // GetAdjacentRegions returns region's info that is adjacent with specific region func (c *clusterInfo) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) { c.RLock() defer c.RUnlock() - return c.BasicCluster.GetAdjacentRegions(region) + return c.core.GetAdjacentRegions(region) } // GetRegion searches for a region by ID. func (c *clusterInfo) GetRegion(regionID uint64) *core.RegionInfo { c.RLock() defer c.RUnlock() - return c.BasicCluster.GetRegion(regionID) + return c.core.GetRegion(regionID) } // IsRegionHot checks if a region is in hot state. func (c *clusterInfo) IsRegionHot(id uint64) bool { c.RLock() defer c.RUnlock() - return c.BasicCluster.IsRegionHot(id, c.GetHotRegionLowThreshold()) + return c.core.IsRegionHot(id, c.GetHotRegionLowThreshold()) } // RandHotRegionFromStore randomly picks a hot region in specified store. func (c *clusterInfo) RandHotRegionFromStore(store uint64, kind schedule.FlowKind) *core.RegionInfo { c.RLock() defer c.RUnlock() - r := c.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionLowThreshold()) + r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionLowThreshold()) if r == nil { return nil } - return c.BasicCluster.GetRegion(r.RegionID) + return c.core.GetRegion(r.RegionID) } func (c *clusterInfo) searchRegion(regionKey []byte) *core.RegionInfo { c.RLock() defer c.RUnlock() - return c.Regions.SearchRegion(regionKey) + return c.core.Regions.SearchRegion(regionKey) } func (c *clusterInfo) putRegion(region *core.RegionInfo) error { @@ -248,91 +248,91 @@ func (c *clusterInfo) putRegionLocked(region *core.RegionInfo) error { return errors.Trace(err) } } - return c.BasicCluster.PutRegion(region) + return c.core.PutRegion(region) } func (c *clusterInfo) getRegions() []*core.RegionInfo { c.RLock() defer c.RUnlock() - return c.Regions.GetRegions() + return c.core.Regions.GetRegions() } func (c *clusterInfo) randomRegion(opts ...core.RegionOption) *core.RegionInfo { c.RLock() defer c.RUnlock() - return c.Regions.RandRegion(opts...) + return c.core.Regions.RandRegion(opts...) } func (c *clusterInfo) getMetaRegions() []*metapb.Region { c.RLock() defer c.RUnlock() - return c.Regions.GetMetaRegions() + return c.core.Regions.GetMetaRegions() } func (c *clusterInfo) getRegionCount() int { c.RLock() defer c.RUnlock() - return c.Regions.GetRegionCount() + return c.core.Regions.GetRegionCount() } func (c *clusterInfo) getRegionStats(startKey, endKey []byte) *core.RegionStats { c.RLock() defer c.RUnlock() - return c.Regions.GetRegionStats(startKey, endKey) + return c.core.Regions.GetRegionStats(startKey, endKey) } func (c *clusterInfo) dropRegion(id uint64) { c.Lock() defer c.Unlock() - if region := c.BasicCluster.GetRegion(id); region != nil { - c.Regions.RemoveRegion(region) + if region := c.core.GetRegion(id); region != nil { + c.core.Regions.RemoveRegion(region) } } func (c *clusterInfo) getStoreRegionCount(storeID uint64) int { c.RLock() defer c.RUnlock() - return c.Regions.GetStoreRegionCount(storeID) + return c.core.Regions.GetStoreRegionCount(storeID) } func (c *clusterInfo) getStoreLeaderCount(storeID uint64) int { c.RLock() defer c.RUnlock() - return c.Regions.GetStoreLeaderCount(storeID) + return c.core.Regions.GetStoreLeaderCount(storeID) } // RandLeaderRegion returns a random region that has leader on the store. func (c *clusterInfo) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { c.RLock() defer c.RUnlock() - return c.BasicCluster.RandLeaderRegion(storeID, opts...) + return c.core.RandLeaderRegion(storeID, opts...) } // RandFollowerRegion returns a random region that has a follower on the store. func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { c.RLock() defer c.RUnlock() - return c.BasicCluster.RandFollowerRegion(storeID, opts...) + return c.core.RandFollowerRegion(storeID, opts...) } // GetAverageRegionSize returns the average region approximate size. func (c *clusterInfo) GetAverageRegionSize() int64 { c.RLock() defer c.RUnlock() - return c.BasicCluster.GetAverageRegionSize() + return c.core.GetAverageRegionSize() } // GetRegionStores returns all stores that contains the region's peer. func (c *clusterInfo) GetRegionStores(region *core.RegionInfo) []*core.StoreInfo { c.RLock() defer c.RUnlock() - return c.getRegionStores(region) + return c.getRegionStoresLocked(region) } -func (c *clusterInfo) getRegionStores(region *core.RegionInfo) []*core.StoreInfo { +func (c *clusterInfo) getRegionStoresLocked(region *core.RegionInfo) []*core.StoreInfo { var stores []*core.StoreInfo for id := range region.GetStoreIds() { - if store := c.Stores.GetStore(id); store != nil { + if store := c.core.Stores.GetStore(id); store != nil { stores = append(stores, store) } } @@ -343,7 +343,7 @@ func (c *clusterInfo) getRegionStores(region *core.RegionInfo) []*core.StoreInfo func (c *clusterInfo) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo { c.RLock() defer c.RUnlock() - return c.Stores.GetStore(region.Leader.GetStoreId()) + return c.core.Stores.GetStore(region.Leader.GetStoreId()) } // GetFollowerStores returns all stores that contains the region's follower peer. @@ -352,7 +352,7 @@ func (c *clusterInfo) GetFollowerStores(region *core.RegionInfo) []*core.StoreIn defer c.RUnlock() var stores []*core.StoreInfo for id := range region.GetFollowers() { - if store := c.Stores.GetStore(id); store != nil { + if store := c.core.Stores.GetStore(id); store != nil { stores = append(stores, store) } } @@ -363,7 +363,7 @@ func (c *clusterInfo) GetFollowerStores(region *core.RegionInfo) []*core.StoreIn func (c *clusterInfo) isPrepared() bool { c.RLock() defer c.RUnlock() - return float64(c.Regions.Length())*collectFactor <= float64(c.activeRegions) + return float64(c.core.Regions.Length())*collectFactor <= float64(c.activeRegions) } // handleStoreHeartbeat updates the store status. @@ -372,32 +372,32 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error { defer c.Unlock() storeID := stats.GetStoreId() - store := c.Stores.GetStore(storeID) + store := c.core.Stores.GetStore(storeID) if store == nil { return errors.Trace(core.ErrStoreNotFound(storeID)) } store.Stats = proto.Clone(stats).(*pdpb.StoreStats) store.LastHeartbeatTS = time.Now() - c.Stores.SetStore(store) + c.core.Stores.SetStore(store) return nil } -func (c *clusterInfo) updateStoreStatus(id uint64) { - c.Stores.SetLeaderCount(id, c.Regions.GetStoreLeaderCount(id)) - c.Stores.SetRegionCount(id, c.Regions.GetStoreRegionCount(id)) - c.Stores.SetPendingPeerCount(id, c.Regions.GetStorePendingPeerCount(id)) - c.Stores.SetLeaderSize(id, c.Regions.GetStoreLeaderRegionSize(id)) - c.Stores.SetRegionSize(id, c.Regions.GetStoreRegionSize(id)) +func (c *clusterInfo) updateStoreStatusLocked(id uint64) { + c.core.Stores.SetLeaderCount(id, c.core.Regions.GetStoreLeaderCount(id)) + c.core.Stores.SetRegionCount(id, c.core.Regions.GetStoreRegionCount(id)) + c.core.Stores.SetPendingPeerCount(id, c.core.Regions.GetStorePendingPeerCount(id)) + c.core.Stores.SetLeaderSize(id, c.core.Regions.GetStoreLeaderRegionSize(id)) + c.core.Stores.SetRegionSize(id, c.core.Regions.GetStoreRegionSize(id)) } // handleRegionHeartbeat updates the region information. func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { region = region.Clone() c.RLock() - origin := c.Regions.GetRegion(region.GetId()) - isWriteUpdate, writeItem := c.CheckWriteStatus(region) - isReadUpdate, readItem := c.CheckReadStatus(region) + origin := c.core.Regions.GetRegion(region.GetId()) + isWriteUpdate, writeItem := c.core.CheckWriteStatus(region) + isReadUpdate, readItem := c.core.CheckReadStatus(region) c.RUnlock() // Save to KV if meta is updated. @@ -458,7 +458,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { } if saveCache { - overlaps := c.Regions.SetRegion(region) + overlaps := c.core.Regions.SetRegion(region) if c.kv != nil { for _, item := range overlaps { if err := c.kv.DeleteRegion(item); err != nil { @@ -476,24 +476,24 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { // Update related stores. if origin != nil { for _, p := range origin.Peers { - c.updateStoreStatus(p.GetStoreId()) + c.updateStoreStatusLocked(p.GetStoreId()) } } for _, p := range region.Peers { - c.updateStoreStatus(p.GetStoreId()) + c.updateStoreStatusLocked(p.GetStoreId()) } } if c.regionStats != nil { - c.regionStats.Observe(region, c.getRegionStores(region)) + c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } key := region.GetId() if isWriteUpdate { - c.HotCache.Update(key, writeItem, schedule.WriteFlow) + c.core.HotCache.Update(key, writeItem, schedule.WriteFlow) } if isReadUpdate { - c.HotCache.Update(key, readItem, schedule.ReadFlow) + c.core.HotCache.Update(key, readItem, schedule.ReadFlow) } return nil } @@ -502,7 +502,7 @@ func (c *clusterInfo) updateRegionsLabelLevelStats(regions []*core.RegionInfo) { c.Lock() defer c.Unlock() for _, region := range regions { - c.labelLevelStats.Observe(region, c.getRegionStores(region), c.GetLocationLabels()) + c.labelLevelStats.Observe(region, c.getRegionStoresLocked(region), c.GetLocationLabels()) } } @@ -515,7 +515,7 @@ func (c *clusterInfo) collectMetrics() { c.regionStats.Collect() c.labelLevelStats.Collect() // collect hot cache metrics - c.HotCache.CollectMetrics(c.Stores) + c.core.HotCache.CollectMetrics(c.core.Stores) } func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.RegionInfo { @@ -602,3 +602,15 @@ func (c *clusterInfo) IsRaftLearnerEnabled() bool { func (c *clusterInfo) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { return c.opt.CheckLabelProperty(typ, labels) } + +// RegionReadStats returns hot region's read stats. +func (c *clusterInfo) RegionReadStats() []*core.RegionStat { + // RegionStats is a thread-safe method + return c.core.HotCache.RegionStats(schedule.ReadFlow) +} + +// RegionWriteStats returns hot region's write stats. +func (c *clusterInfo) RegionWriteStats() []*core.RegionStat { + // RegionStats is a thread-safe method + return c.core.HotCache.RegionStats(schedule.WriteFlow) +} diff --git a/server/cache_test.go b/server/cache_test.go index 83496314663..d1087ae0010 100644 --- a/server/cache_test.go +++ b/server/cache_test.go @@ -355,12 +355,12 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { for i, region := range regions { // region does not exist. c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) checkRegionsKV(c, cache.kv, regions[:i+1]) // region is the same, not updated. c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) checkRegionsKV(c, cache.kv, regions[:i+1]) epoch := region.Clone().GetRegionEpoch() @@ -370,7 +370,7 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { Version: epoch.GetVersion() + 1, } c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) checkRegionsKV(c, cache.kv, regions[:i+1]) // region is stale (Version). @@ -379,7 +379,7 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { ConfVer: epoch.GetConfVer() + 1, } c.Assert(cache.handleRegionHeartbeat(stale), NotNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) checkRegionsKV(c, cache.kv, regions[:i+1]) // region is updated. @@ -388,7 +388,7 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { ConfVer: epoch.GetConfVer() + 1, } c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) checkRegionsKV(c, cache.kv, regions[:i+1]) // region is stale (ConfVer). @@ -397,7 +397,7 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { Version: epoch.GetVersion() + 1, } c.Assert(cache.handleRegionHeartbeat(stale), NotNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) checkRegionsKV(c, cache.kv, regions[:i+1]) // Add a down peer. @@ -408,22 +408,22 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { }, } c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) // Add a pending peer. region.PendingPeers = []*metapb.Peer{region.Peers[rand.Intn(len(region.Peers))]} c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) // Clear down peers. region.DownPeers = nil c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) // Clear pending peers. region.PendingPeers = nil c.Assert(cache.handleRegionHeartbeat(region), IsNil) - checkRegions(c, cache.Regions, regions[:i+1]) + checkRegions(c, cache.core.Regions, regions[:i+1]) } regionCounts := make(map[uint64]int) @@ -453,11 +453,11 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { } } - for _, store := range cache.Stores.GetStores() { - c.Assert(store.LeaderCount, Equals, cache.Regions.GetStoreLeaderCount(store.GetId())) - c.Assert(store.RegionCount, Equals, cache.Regions.GetStoreRegionCount(store.GetId())) - c.Assert(store.LeaderSize, Equals, cache.Regions.GetStoreLeaderRegionSize(store.GetId())) - c.Assert(store.RegionSize, Equals, cache.Regions.GetStoreRegionSize(store.GetId())) + for _, store := range cache.core.Stores.GetStores() { + c.Assert(store.LeaderCount, Equals, cache.core.Regions.GetStoreLeaderCount(store.GetId())) + c.Assert(store.RegionCount, Equals, cache.core.Regions.GetStoreRegionCount(store.GetId())) + c.Assert(store.LeaderSize, Equals, cache.core.Regions.GetStoreLeaderRegionSize(store.GetId())) + c.Assert(store.RegionSize, Equals, cache.core.Regions.GetStoreRegionSize(store.GetId())) } // Test with kv. @@ -600,7 +600,7 @@ func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) { func checkPendingPeerCount(expect []int, cache *clusterInfo, c *C) { for i, e := range expect { - s := cache.Stores.GetStore(uint64(i + 1)) + s := cache.core.Stores.GetStore(uint64(i + 1)) c.Assert(s.PendingPeerCount, Equals, e) } } diff --git a/server/cluster_worker_test.go b/server/cluster_worker_test.go index 2b1d1cd8334..62fccee49a9 100644 --- a/server/cluster_worker_test.go +++ b/server/cluster_worker_test.go @@ -360,7 +360,7 @@ func (s *testClusterWorkerSuite) checkSearchRegions(cluster *RaftCluster, keys . cluster.cachedCluster.RLock() defer cluster.cachedCluster.RUnlock() - cacheRegions := cluster.cachedCluster.Regions + cacheRegions := cluster.cachedCluster.core.Regions if cacheRegions.TreeLength() != len(keys)/2 { c.Logf("region length not match, expect %v, got %v", len(keys)/2, cacheRegions.TreeLength()) return false