Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Do not use anonymous field in clusterInfo #1094

Merged
merged 4 commits into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 62 additions & 52 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

type clusterInfo struct {
sync.RWMutex
*schedule.BasicCluster
core *schedule.BasicCluster

id core.IDAllocator
kv *core.KV
Expand All @@ -42,7 +42,7 @@ type clusterInfo struct {

func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo {
return &clusterInfo{
BasicCluster: schedule.NewBasicCluster(),
core: schedule.NewBasicCluster(),
id: id,
opt: opt,
kv: kv,
Expand All @@ -64,16 +64,16 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl
}

start := time.Now()
if err := kv.LoadStores(c.Stores); err != nil {
if err := kv.LoadStores(c.core.Stores); err != nil {
return nil, errors.Trace(err)
}
log.Infof("load %v stores cost %v", c.Stores.GetStoreCount(), time.Since(start))
log.Infof("load %v stores cost %v", c.core.Stores.GetStoreCount(), time.Since(start))

start = time.Now()
if err := kv.LoadRegions(c.Regions); err != nil {
if err := kv.LoadRegions(c.core.Regions); err != nil {
return nil, errors.Trace(err)
}
log.Infof("load %v regions cost %v", c.Regions.GetRegionCount(), time.Since(start))
log.Infof("load %v regions cost %v", c.core.Regions.GetRegionCount(), time.Since(start))

return c, nil
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (c *clusterInfo) putMetaLocked(meta *metapb.Cluster) error {
func (c *clusterInfo) GetStore(storeID uint64) *core.StoreInfo {
c.RLock()
defer c.RUnlock()
return c.BasicCluster.GetStore(storeID)
return c.core.GetStore(storeID)
}

func (c *clusterInfo) putStore(store *core.StoreInfo) error {
Expand All @@ -143,97 +143,97 @@ func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error {
return errors.Trace(err)
}
}
return c.BasicCluster.PutStore(store)
return c.core.PutStore(store)
}

// BlockStore stops balancer from selecting the store.
func (c *clusterInfo) BlockStore(storeID uint64) error {
c.Lock()
defer c.Unlock()
return c.BasicCluster.BlockStore(storeID)
return c.core.BlockStore(storeID)
}

// UnblockStore allows balancer to select the store.
func (c *clusterInfo) UnblockStore(storeID uint64) {
c.Lock()
defer c.Unlock()
c.BasicCluster.UnblockStore(storeID)
c.core.UnblockStore(storeID)
}

// GetStores returns all stores in the cluster.
func (c *clusterInfo) GetStores() []*core.StoreInfo {
c.RLock()
defer c.RUnlock()
return c.BasicCluster.GetStores()
return c.core.GetStores()
}

func (c *clusterInfo) getMetaStores() []*metapb.Store {
c.RLock()
defer c.RUnlock()
return c.Stores.GetMetaStores()
return c.core.Stores.GetMetaStores()
}

func (c *clusterInfo) getStoreCount() int {
c.RLock()
defer c.RUnlock()
return c.Stores.GetStoreCount()
return c.core.Stores.GetStoreCount()
}

func (c *clusterInfo) getStoresWriteStat() map[uint64]uint64 {
c.RLock()
defer c.RUnlock()
return c.Stores.GetStoresWriteStat()
return c.core.Stores.GetStoresWriteStat()
}

func (c *clusterInfo) getStoresReadStat() map[uint64]uint64 {
c.RLock()
defer c.RUnlock()
return c.Stores.GetStoresReadStat()
return c.core.Stores.GetStoresReadStat()
}

// ScanRegions scans region with start key, until number greater than limit.
func (c *clusterInfo) ScanRegions(startKey []byte, limit int) []*core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.Regions.ScanRange(startKey, limit)
return c.core.Regions.ScanRange(startKey, limit)
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (c *clusterInfo) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) {
c.RLock()
defer c.RUnlock()
return c.BasicCluster.GetAdjacentRegions(region)
return c.core.GetAdjacentRegions(region)
}

// GetRegion searches for a region by ID.
func (c *clusterInfo) GetRegion(regionID uint64) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.BasicCluster.GetRegion(regionID)
return c.core.GetRegion(regionID)
}

// IsRegionHot checks if a region is in hot state.
func (c *clusterInfo) IsRegionHot(id uint64) bool {
c.RLock()
defer c.RUnlock()
return c.BasicCluster.IsRegionHot(id, c.GetHotRegionLowThreshold())
return c.core.IsRegionHot(id, c.GetHotRegionLowThreshold())
}

// RandHotRegionFromStore randomly picks a hot region in specified store.
func (c *clusterInfo) RandHotRegionFromStore(store uint64, kind schedule.FlowKind) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
r := c.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionLowThreshold())
r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionLowThreshold())
if r == nil {
return nil
}
return c.BasicCluster.GetRegion(r.RegionID)
return c.core.GetRegion(r.RegionID)
}

func (c *clusterInfo) searchRegion(regionKey []byte) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.Regions.SearchRegion(regionKey)
return c.core.Regions.SearchRegion(regionKey)
}

func (c *clusterInfo) putRegion(region *core.RegionInfo) error {
Expand All @@ -248,71 +248,71 @@ func (c *clusterInfo) putRegionLocked(region *core.RegionInfo) error {
return errors.Trace(err)
}
}
return c.BasicCluster.PutRegion(region)
return c.core.PutRegion(region)
}

func (c *clusterInfo) getRegions() []*core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.Regions.GetRegions()
return c.core.Regions.GetRegions()
}

func (c *clusterInfo) randomRegion(opts ...core.RegionOption) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.Regions.RandRegion(opts...)
return c.core.Regions.RandRegion(opts...)
}

func (c *clusterInfo) getMetaRegions() []*metapb.Region {
c.RLock()
defer c.RUnlock()
return c.Regions.GetMetaRegions()
return c.core.Regions.GetMetaRegions()
}

func (c *clusterInfo) getRegionCount() int {
c.RLock()
defer c.RUnlock()
return c.Regions.GetRegionCount()
return c.core.Regions.GetRegionCount()
}

func (c *clusterInfo) getRegionStats(startKey, endKey []byte) *core.RegionStats {
c.RLock()
defer c.RUnlock()
return c.Regions.GetRegionStats(startKey, endKey)
return c.core.Regions.GetRegionStats(startKey, endKey)
}

func (c *clusterInfo) dropRegion(id uint64) {
c.Lock()
defer c.Unlock()
if region := c.BasicCluster.GetRegion(id); region != nil {
c.Regions.RemoveRegion(region)
if region := c.core.GetRegion(id); region != nil {
c.core.Regions.RemoveRegion(region)
}
}

func (c *clusterInfo) getStoreRegionCount(storeID uint64) int {
c.RLock()
defer c.RUnlock()
return c.Regions.GetStoreRegionCount(storeID)
return c.core.Regions.GetStoreRegionCount(storeID)
}

func (c *clusterInfo) getStoreLeaderCount(storeID uint64) int {
c.RLock()
defer c.RUnlock()
return c.Regions.GetStoreLeaderCount(storeID)
return c.core.Regions.GetStoreLeaderCount(storeID)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (c *clusterInfo) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.BasicCluster.RandLeaderRegion(storeID, opts...)
return c.core.RandLeaderRegion(storeID, opts...)
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.BasicCluster.RandFollowerRegion(storeID, opts...)
return c.core.RandFollowerRegion(storeID, opts...)
}

// GetRegionStores returns all stores that contains the region's peer.
Expand All @@ -325,7 +325,7 @@ func (c *clusterInfo) GetRegionStores(region *core.RegionInfo) []*core.StoreInfo
func (c *clusterInfo) getRegionStores(region *core.RegionInfo) []*core.StoreInfo {
var stores []*core.StoreInfo
for id := range region.GetStoreIds() {
if store := c.Stores.GetStore(id); store != nil {
if store := c.core.Stores.GetStore(id); store != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename getRegionStores to getRegionStoresLocked?

stores = append(stores, store)
}
}
Expand All @@ -336,7 +336,7 @@ func (c *clusterInfo) getRegionStores(region *core.RegionInfo) []*core.StoreInfo
func (c *clusterInfo) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo {
c.RLock()
defer c.RUnlock()
return c.Stores.GetStore(region.Leader.GetStoreId())
return c.core.Stores.GetStore(region.Leader.GetStoreId())
}

// GetFollowerStores returns all stores that contains the region's follower peer.
Expand All @@ -345,7 +345,7 @@ func (c *clusterInfo) GetFollowerStores(region *core.RegionInfo) []*core.StoreIn
defer c.RUnlock()
var stores []*core.StoreInfo
for id := range region.GetFollowers() {
if store := c.Stores.GetStore(id); store != nil {
if store := c.core.Stores.GetStore(id); store != nil {
stores = append(stores, store)
}
}
Expand All @@ -356,7 +356,7 @@ func (c *clusterInfo) GetFollowerStores(region *core.RegionInfo) []*core.StoreIn
func (c *clusterInfo) isPrepared() bool {
c.RLock()
defer c.RUnlock()
return float64(c.Regions.Length())*collectFactor <= float64(c.activeRegions)
return float64(c.core.Regions.Length())*collectFactor <= float64(c.activeRegions)
}

// handleStoreHeartbeat updates the store status.
Expand All @@ -365,32 +365,32 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error {
defer c.Unlock()

storeID := stats.GetStoreId()
store := c.Stores.GetStore(storeID)
store := c.core.Stores.GetStore(storeID)
if store == nil {
return errors.Trace(core.ErrStoreNotFound(storeID))
}
store.Stats = proto.Clone(stats).(*pdpb.StoreStats)
store.LastHeartbeatTS = time.Now()

c.Stores.SetStore(store)
c.core.Stores.SetStore(store)
return nil
}

func (c *clusterInfo) updateStoreStatus(id uint64) {
c.Stores.SetLeaderCount(id, c.Regions.GetStoreLeaderCount(id))
c.Stores.SetRegionCount(id, c.Regions.GetStoreRegionCount(id))
c.Stores.SetPendingPeerCount(id, c.Regions.GetStorePendingPeerCount(id))
c.Stores.SetLeaderSize(id, c.Regions.GetStoreLeaderRegionSize(id))
c.Stores.SetRegionSize(id, c.Regions.GetStoreRegionSize(id))
c.core.Stores.SetLeaderCount(id, c.core.Regions.GetStoreLeaderCount(id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename updateStoreStatus to updateStoreStatusLocked?

c.core.Stores.SetRegionCount(id, c.core.Regions.GetStoreRegionCount(id))
c.core.Stores.SetPendingPeerCount(id, c.core.Regions.GetStorePendingPeerCount(id))
c.core.Stores.SetLeaderSize(id, c.core.Regions.GetStoreLeaderRegionSize(id))
c.core.Stores.SetRegionSize(id, c.core.Regions.GetStoreRegionSize(id))
}

// handleRegionHeartbeat updates the region information.
func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
region = region.Clone()
c.RLock()
origin := c.Regions.GetRegion(region.GetId())
isWriteUpdate, writeItem := c.CheckWriteStatus(region)
isReadUpdate, readItem := c.CheckReadStatus(region)
origin := c.core.Regions.GetRegion(region.GetId())
isWriteUpdate, writeItem := c.core.CheckWriteStatus(region)
isReadUpdate, readItem := c.core.CheckReadStatus(region)
c.RUnlock()

// Save to KV if meta is updated.
Expand Down Expand Up @@ -451,7 +451,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
}

if saveCache {
overlaps := c.Regions.SetRegion(region)
overlaps := c.core.Regions.SetRegion(region)
if c.kv != nil {
for _, item := range overlaps {
if err := c.kv.DeleteRegion(item); err != nil {
Expand Down Expand Up @@ -483,10 +483,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {

key := region.GetId()
if isWriteUpdate {
c.HotCache.Update(key, writeItem, schedule.WriteFlow)
c.core.HotCache.Update(key, writeItem, schedule.WriteFlow)
}
if isReadUpdate {
c.HotCache.Update(key, readItem, schedule.ReadFlow)
c.core.HotCache.Update(key, readItem, schedule.ReadFlow)
}
return nil
}
Expand All @@ -508,7 +508,7 @@ func (c *clusterInfo) collectMetrics() {
c.regionStats.Collect()
c.labelLevelStats.Collect()
// collect hot cache metrics
c.HotCache.CollectMetrics(c.Stores)
c.core.HotCache.CollectMetrics(c.core.Stores)
}

func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.RegionInfo {
Expand Down Expand Up @@ -595,3 +595,13 @@ func (c *clusterInfo) IsRaftLearnerEnabled() bool {
func (c *clusterInfo) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return c.opt.CheckLabelProperty(typ, labels)
}

// RegionReadStats returns hot region's read stats.
func (c *clusterInfo) RegionReadStats() []*core.RegionStat {
return c.core.HotCache.RegionStats(schedule.ReadFlow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need lock?

}

// RegionWriteStats returns hot region's write stats.
func (c *clusterInfo) RegionWriteStats() []*core.RegionStat {
return c.core.HotCache.RegionStats(schedule.WriteFlow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need lock?

}
Loading