diff --git a/server/coordinator.go b/server/coordinator.go index 897a5f71255..b27c4af1b86 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -349,6 +349,9 @@ func (c *coordinator) collectHotSpotMetrics() { hotSpotStatusGauge.WithLabelValues(store, "hot_read_region_as_leader").Set(0) } } + + // collect hot cache metrics + c.cluster.HotCache.CollectMetrics(c.cluster.Stores) } func (c *coordinator) shouldRun() bool { diff --git a/server/schedule/hot_cache.go b/server/schedule/hot_cache.go index d772831fa0b..45841115b86 100644 --- a/server/schedule/hot_cache.go +++ b/server/schedule/hot_cache.go @@ -45,10 +45,14 @@ func newHotSpotCache() *HotSpotCache { // CheckWrite checks the write status, returns whether need update statistics and item. func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) { - var WrittenBytesPerSec uint64 + var ( + WrittenBytesPerSec uint64 + value *core.RegionStat + ) v, isExist := w.writeFlow.Peek(region.GetId()) if isExist && !Simulating { - interval := time.Since(v.(*core.RegionStat).LastUpdateTime).Seconds() + value = v.(*core.RegionStat) + interval := time.Since(value.LastUpdateTime).Seconds() if interval < minHotRegionReportInterval { return false, nil } @@ -56,27 +60,21 @@ func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresIn } else { WrittenBytesPerSec = uint64(float64(region.WrittenBytes) / float64(RegionHeartBeatReportInterval)) } - region.WrittenBytes = WrittenBytesPerSec - - // hotRegionThreshold is use to pick hot region - // suppose the number of the hot Regions is statCacheMaxLen - // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions - // divide 2 because the store reports data about two times than the region record write to rocksdb - divisor := float64(statCacheMaxLen) * 2 * storeHeartBeatReportInterval - hotRegionThreshold := uint64(float64(stores.TotalWrittenBytes()) / divisor) - if hotRegionThreshold < hotWriteRegionMinFlowRate { - hotRegionThreshold = hotWriteRegionMinFlowRate - } - return w.isNeedUpdateStatCache(region, hotRegionThreshold, WriteFlow) + hotRegionThreshold := calculateWriteHotThreshold(stores) + return w.isNeedUpdateStatCache(region, WrittenBytesPerSec, hotRegionThreshold, value, WriteFlow) } // CheckRead checks the read status, returns whether need update statistics and item. func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) { - var ReadBytesPerSec uint64 + var ( + ReadBytesPerSec uint64 + value *core.RegionStat + ) v, isExist := w.readFlow.Peek(region.GetId()) if isExist && !Simulating { - interval := time.Since(v.(*core.RegionStat).LastUpdateTime).Seconds() + value = v.(*core.RegionStat) + interval := time.Since(value.LastUpdateTime).Seconds() if interval < minHotRegionReportInterval { return false, nil } @@ -84,8 +82,34 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf } else { ReadBytesPerSec = uint64(float64(region.ReadBytes) / float64(RegionHeartBeatReportInterval)) } - region.ReadBytes = ReadBytesPerSec + hotRegionThreshold := calculateReadHotThreshold(stores) + return w.isNeedUpdateStatCache(region, ReadBytesPerSec, hotRegionThreshold, value, ReadFlow) +} + +func (w *HotSpotCache) incMetrics(name string, kind FlowKind) { + switch kind { + case WriteFlow: + hotCacheStatusGauge.WithLabelValues(name, "write").Inc() + case ReadFlow: + hotCacheStatusGauge.WithLabelValues(name, "read").Inc() + } +} + +func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 { + // hotRegionThreshold is use to pick hot region + // suppose the number of the hot Regions is statCacheMaxLen + // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions + // divide 2 because the store reports data about two times than the region record write to rocksdb + divisor := float64(statCacheMaxLen) * 2 * storeHeartBeatReportInterval + hotRegionThreshold := uint64(float64(stores.TotalWrittenBytes()) / divisor) + + if hotRegionThreshold < hotWriteRegionMinFlowRate { + hotRegionThreshold = hotWriteRegionMinFlowRate + } + return hotRegionThreshold +} +func calculateReadHotThreshold(stores *core.StoresInfo) uint64 { // hotRegionThreshold is use to pick hot region // suppose the number of the hot Regions is statLRUMaxLen // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions @@ -95,26 +119,10 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf if hotRegionThreshold < hotReadRegionMinFlowRate { hotRegionThreshold = hotReadRegionMinFlowRate } - return w.isNeedUpdateStatCache(region, hotRegionThreshold, ReadFlow) + return hotRegionThreshold } -func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, hotRegionThreshold uint64, kind FlowKind) (bool, *core.RegionStat) { - var ( - v *core.RegionStat - value interface{} - isExist bool - flowBytes uint64 - ) - key := region.GetId() - - switch kind { - case WriteFlow: - value, isExist = w.writeFlow.Peek(key) - flowBytes = region.WrittenBytes - case ReadFlow: - value, isExist = w.readFlow.Peek(key) - flowBytes = region.ReadBytes - } +func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, oldItem *core.RegionStat, kind FlowKind) (bool, *core.RegionStat) { newItem := &core.RegionStat{ RegionID: region.GetId(), FlowBytes: flowBytes, @@ -124,31 +132,27 @@ func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, hotRegionT AntiCount: hotRegionAntiCount, } - if isExist { - v = value.(*core.RegionStat) - newItem.HotDegree = v.HotDegree + 1 + if oldItem != nil { + newItem.HotDegree = oldItem.HotDegree + 1 } - switch kind { - case WriteFlow: - if region.WrittenBytes >= hotRegionThreshold { - return true, newItem - } - case ReadFlow: - if region.ReadBytes >= hotRegionThreshold { - return true, newItem + if flowBytes >= hotRegionThreshold { + if oldItem == nil { + w.incMetrics("add_item", kind) } + return true, newItem } // smaller than hotReionThreshold - if !isExist { + if oldItem == nil { return false, newItem } - if v.AntiCount <= 0 { + if oldItem.AntiCount <= 0 { + w.incMetrics("remove_item", kind) return true, nil } // eliminate some noise - newItem.HotDegree = v.HotDegree - 1 - newItem.AntiCount = v.AntiCount - 1 - newItem.FlowBytes = v.FlowBytes + newItem.HotDegree = oldItem.HotDegree - 1 + newItem.AntiCount = oldItem.AntiCount - 1 + newItem.FlowBytes = oldItem.FlowBytes return true, newItem } @@ -160,12 +164,14 @@ func (w *HotSpotCache) Update(key uint64, item *core.RegionStat, kind FlowKind) w.writeFlow.Remove(key) } else { w.writeFlow.Put(key, item) + w.incMetrics("update_item", kind) } case ReadFlow: if item == nil { w.readFlow.Remove(key) } else { w.readFlow.Put(key, item) + w.incMetrics("update_item", kind) } } } @@ -197,6 +203,16 @@ func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hot return nil } +// CollectMetrics collect the hot cache metrics +func (w *HotSpotCache) CollectMetrics(stores *core.StoresInfo) { + hotCacheStatusGauge.WithLabelValues("total_length", "write").Set(float64(w.writeFlow.Len())) + hotCacheStatusGauge.WithLabelValues("total_length", "read").Set(float64(w.readFlow.Len())) + threshold := calculateWriteHotThreshold(stores) + hotCacheStatusGauge.WithLabelValues("hotThreshold", "write").Set(float64(threshold)) + threshold = calculateReadHotThreshold(stores) + hotCacheStatusGauge.WithLabelValues("hotThreshold", "read").Set(float64(threshold)) +} + func (w *HotSpotCache) isRegionHot(id uint64, hotThreshold int) bool { if stat, ok := w.writeFlow.Peek(id); ok { if stat.(*core.RegionStat).HotDegree >= hotThreshold { diff --git a/server/schedule/metrics.go b/server/schedule/metrics.go index e6566969fd6..1cff894513a 100644 --- a/server/schedule/metrics.go +++ b/server/schedule/metrics.go @@ -32,9 +32,18 @@ var ( Help: "Bucketed histogram of processing time (s) of finished operator step.", Buckets: prometheus.ExponentialBuckets(0.01, 2, 16), }, []string{"type"}) + + hotCacheStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "hotcache", + Name: "status", + Help: "Status of the hotspot.", + }, []string{"name", "type"}) ) func init() { prometheus.MustRegister(checkerCounter) prometheus.MustRegister(operatorStepDuration) + prometheus.MustRegister(hotCacheStatusGauge) } diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 3c1cc80d783..d11e8be8145 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -998,7 +998,6 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { r := tc.RandHotRegionFromStore(2, schedule.ReadFlow) c.Assert(r, NotNil) c.Assert(r.GetId(), Equals, uint64(2)) - c.Assert(r.ReadBytes, Equals, uint64(512*1024)) // check hot items stats := tc.HotCache.RegionStats(schedule.ReadFlow) c.Assert(len(stats), Equals, 3)