From 3581057aae58a446f08d70fea9906631102d5721 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 16 Apr 2018 19:59:05 +0800 Subject: [PATCH 1/5] *: add metrics for hotspot cache --- server/coordinator.go | 3 ++ server/schedule/hot_cache.go | 66 +++++++++++++++++++----------------- server/schedule/metrics.go | 8 +++++ 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/server/coordinator.go b/server/coordinator.go index 62a61b6270a..2b9a5e1b8ca 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -344,6 +344,9 @@ func (c *coordinator) collectHotSpotMetrics() { hotSpotStatusGauge.WithLabelValues(store, "hot_read_region_as_leader").Set(0) } } + + // collect hot cache metrics + c.cluster.HotCache.CollectMetrics() } func (c *coordinator) shouldRun() bool { diff --git a/server/schedule/hot_cache.go b/server/schedule/hot_cache.go index d772831fa0b..83dda5547ba 100644 --- a/server/schedule/hot_cache.go +++ b/server/schedule/hot_cache.go @@ -45,10 +45,14 @@ func newHotSpotCache() *HotSpotCache { // CheckWrite checks the write status, returns whether need update statistics and item. func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) { - var WrittenBytesPerSec uint64 + var ( + WrittenBytesPerSec uint64 + value *core.RegionStat + ) v, isExist := w.writeFlow.Peek(region.GetId()) if isExist && !Simulating { - interval := time.Since(v.(*core.RegionStat).LastUpdateTime).Seconds() + value = v.(*core.RegionStat) + interval := time.Since(value.LastUpdateTime).Seconds() if interval < minHotRegionReportInterval { return false, nil } @@ -56,7 +60,6 @@ func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresIn } else { WrittenBytesPerSec = uint64(float64(region.WrittenBytes) / float64(RegionHeartBeatReportInterval)) } - region.WrittenBytes = WrittenBytesPerSec // hotRegionThreshold is use to pick hot region // suppose the number of the hot Regions is statCacheMaxLen @@ -68,15 +71,19 @@ func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresIn if hotRegionThreshold < hotWriteRegionMinFlowRate { hotRegionThreshold = hotWriteRegionMinFlowRate } - return w.isNeedUpdateStatCache(region, hotRegionThreshold, WriteFlow) + return w.isNeedUpdateStatCache(region, WrittenBytesPerSec, hotRegionThreshold, value, WriteFlow) } // CheckRead checks the read status, returns whether need update statistics and item. func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) { - var ReadBytesPerSec uint64 + var ( + ReadBytesPerSec uint64 + value *core.RegionStat + ) v, isExist := w.readFlow.Peek(region.GetId()) if isExist && !Simulating { - interval := time.Since(v.(*core.RegionStat).LastUpdateTime).Seconds() + value = v.(*core.RegionStat) + interval := time.Since(value.LastUpdateTime).Seconds() if interval < minHotRegionReportInterval { return false, nil } @@ -95,26 +102,19 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf if hotRegionThreshold < hotReadRegionMinFlowRate { hotRegionThreshold = hotReadRegionMinFlowRate } - return w.isNeedUpdateStatCache(region, hotRegionThreshold, ReadFlow) + return w.isNeedUpdateStatCache(region, ReadBytesPerSec, hotRegionThreshold, value, ReadFlow) } -func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, hotRegionThreshold uint64, kind FlowKind) (bool, *core.RegionStat) { - var ( - v *core.RegionStat - value interface{} - isExist bool - flowBytes uint64 - ) - key := region.GetId() - +func (w *HotSpotCache) incMetrics(name string, kind FlowKind) { switch kind { case WriteFlow: - value, isExist = w.writeFlow.Peek(key) - flowBytes = region.WrittenBytes + hotCacheStatusGauge.WithLabelValues(name, "write").Inc() case ReadFlow: - value, isExist = w.readFlow.Peek(key) - flowBytes = region.ReadBytes + hotCacheStatusGauge.WithLabelValues(name, "read").Inc() } +} + +func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, v *core.RegionStat, kind FlowKind) (bool, *core.RegionStat) { newItem := &core.RegionStat{ RegionID: region.GetId(), FlowBytes: flowBytes, @@ -124,22 +124,18 @@ func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, hotRegionT AntiCount: hotRegionAntiCount, } - if isExist { - v = value.(*core.RegionStat) + if v != nil { newItem.HotDegree = v.HotDegree + 1 } - switch kind { - case WriteFlow: - if region.WrittenBytes >= hotRegionThreshold { - return true, newItem - } - case ReadFlow: - if region.ReadBytes >= hotRegionThreshold { - return true, newItem + if flowBytes >= hotRegionThreshold { + if v == nil { + w.incMetrics("add_item", kind) } + return true, newItem } // smaller than hotReionThreshold - if !isExist { + if v == nil { + w.incMetrics("remove_item", kind) return false, newItem } if v.AntiCount <= 0 { @@ -160,12 +156,14 @@ func (w *HotSpotCache) Update(key uint64, item *core.RegionStat, kind FlowKind) w.writeFlow.Remove(key) } else { w.writeFlow.Put(key, item) + w.incMetrics("update_item", kind) } case ReadFlow: if item == nil { w.readFlow.Remove(key) } else { w.readFlow.Put(key, item) + w.incMetrics("update_item", kind) } } } @@ -197,6 +195,12 @@ func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hot return nil } +// CollectMetrics collect the hot cache metrics +func (w *HotSpotCache) CollectMetrics() { + hotCacheStatusGauge.WithLabelValues("total_length", "write").Set(float64(w.writeFlow.Len())) + hotCacheStatusGauge.WithLabelValues("total_length", "read").Set(float64(w.readFlow.Len())) +} + func (w *HotSpotCache) isRegionHot(id uint64, hotThreshold int) bool { if stat, ok := w.writeFlow.Peek(id); ok { if stat.(*core.RegionStat).HotDegree >= hotThreshold { diff --git a/server/schedule/metrics.go b/server/schedule/metrics.go index e6566969fd6..a9413da89dd 100644 --- a/server/schedule/metrics.go +++ b/server/schedule/metrics.go @@ -32,9 +32,17 @@ var ( Help: "Bucketed histogram of processing time (s) of finished operator step.", Buckets: prometheus.ExponentialBuckets(0.01, 2, 16), }, []string{"type"}) + hotCacheStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "hotcache", + Name: "status", + Help: "Status of the hotspot.", + }, []string{"name", "type"}) ) func init() { prometheus.MustRegister(checkerCounter) prometheus.MustRegister(operatorStepDuration) + prometheus.MustRegister(hotCacheStatusGauge) } From f2d5a197843f0fcfd89d22c132725594ccda6bcb Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 17 Apr 2018 11:26:28 +0800 Subject: [PATCH 2/5] address comments --- server/coordinator.go | 2 +- server/schedule/hot_cache.go | 57 ++++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/server/coordinator.go b/server/coordinator.go index 2b9a5e1b8ca..4fbed132deb 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -346,7 +346,7 @@ func (c *coordinator) collectHotSpotMetrics() { } // collect hot cache metrics - c.cluster.HotCache.CollectMetrics() + c.cluster.HotCache.CollectMetrics(c.cluster.Stores) } func (c *coordinator) shouldRun() bool { diff --git a/server/schedule/hot_cache.go b/server/schedule/hot_cache.go index 83dda5547ba..4a7eba7f7c9 100644 --- a/server/schedule/hot_cache.go +++ b/server/schedule/hot_cache.go @@ -61,16 +61,7 @@ func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresIn WrittenBytesPerSec = uint64(float64(region.WrittenBytes) / float64(RegionHeartBeatReportInterval)) } - // hotRegionThreshold is use to pick hot region - // suppose the number of the hot Regions is statCacheMaxLen - // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions - // divide 2 because the store reports data about two times than the region record write to rocksdb - divisor := float64(statCacheMaxLen) * 2 * storeHeartBeatReportInterval - hotRegionThreshold := uint64(float64(stores.TotalWrittenBytes()) / divisor) - - if hotRegionThreshold < hotWriteRegionMinFlowRate { - hotRegionThreshold = hotWriteRegionMinFlowRate - } + hotRegionThreshold := calculateWriteHotThreshold(stores) return w.isNeedUpdateStatCache(region, WrittenBytesPerSec, hotRegionThreshold, value, WriteFlow) } @@ -92,16 +83,7 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf ReadBytesPerSec = uint64(float64(region.ReadBytes) / float64(RegionHeartBeatReportInterval)) } region.ReadBytes = ReadBytesPerSec - - // hotRegionThreshold is use to pick hot region - // suppose the number of the hot Regions is statLRUMaxLen - // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions - divisor := float64(statCacheMaxLen) * storeHeartBeatReportInterval - hotRegionThreshold := uint64(float64(stores.TotalReadBytes()) / divisor) - - if hotRegionThreshold < hotReadRegionMinFlowRate { - hotRegionThreshold = hotReadRegionMinFlowRate - } + hotRegionThreshold := calculateReadHotThreshold(stores) return w.isNeedUpdateStatCache(region, ReadBytesPerSec, hotRegionThreshold, value, ReadFlow) } @@ -114,6 +96,33 @@ func (w *HotSpotCache) incMetrics(name string, kind FlowKind) { } } +func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 { + // hotRegionThreshold is use to pick hot region + // suppose the number of the hot Regions is statCacheMaxLen + // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions + // divide 2 because the store reports data about two times than the region record write to rocksdb + divisor := float64(statCacheMaxLen) * 2 * storeHeartBeatReportInterval + hotRegionThreshold := uint64(float64(stores.TotalWrittenBytes()) / divisor) + + if hotRegionThreshold < hotWriteRegionMinFlowRate { + hotRegionThreshold = hotWriteRegionMinFlowRate + } + return hotRegionThreshold +} + +func calculateReadHotThreshold(stores *core.StoresInfo) uint64 { + // hotRegionThreshold is use to pick hot region + // suppose the number of the hot Regions is statLRUMaxLen + // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions + divisor := float64(statCacheMaxLen) * storeHeartBeatReportInterval + hotRegionThreshold := uint64(float64(stores.TotalReadBytes()) / divisor) + + if hotRegionThreshold < hotReadRegionMinFlowRate { + hotRegionThreshold = hotReadRegionMinFlowRate + } + return hotRegionThreshold +} + func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, v *core.RegionStat, kind FlowKind) (bool, *core.RegionStat) { newItem := &core.RegionStat{ RegionID: region.GetId(), @@ -135,10 +144,10 @@ func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes } // smaller than hotReionThreshold if v == nil { - w.incMetrics("remove_item", kind) return false, newItem } if v.AntiCount <= 0 { + w.incMetrics("remove_item", kind) return true, nil } // eliminate some noise @@ -196,9 +205,13 @@ func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hot } // CollectMetrics collect the hot cache metrics -func (w *HotSpotCache) CollectMetrics() { +func (w *HotSpotCache) CollectMetrics(stores *core.StoresInfo) { hotCacheStatusGauge.WithLabelValues("total_length", "write").Set(float64(w.writeFlow.Len())) hotCacheStatusGauge.WithLabelValues("total_length", "read").Set(float64(w.readFlow.Len())) + threshold := calculateWriteHotThreshold(stores) + hotCacheStatusGauge.WithLabelValues("hotThreshold", "write").Set(float64(threshold)) + threshold = calculateReadHotThreshold(stores) + hotCacheStatusGauge.WithLabelValues("hotThreshold", "read").Set(float64(threshold)) } func (w *HotSpotCache) isRegionHot(id uint64, hotThreshold int) bool { From be57eaf1de2d21ec836e64634e2cd4e784050f0d Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 17 Apr 2018 14:31:21 +0800 Subject: [PATCH 3/5] address comment --- server/schedule/hot_cache.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/server/schedule/hot_cache.go b/server/schedule/hot_cache.go index 4a7eba7f7c9..45841115b86 100644 --- a/server/schedule/hot_cache.go +++ b/server/schedule/hot_cache.go @@ -82,7 +82,6 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf } else { ReadBytesPerSec = uint64(float64(region.ReadBytes) / float64(RegionHeartBeatReportInterval)) } - region.ReadBytes = ReadBytesPerSec hotRegionThreshold := calculateReadHotThreshold(stores) return w.isNeedUpdateStatCache(region, ReadBytesPerSec, hotRegionThreshold, value, ReadFlow) } @@ -123,7 +122,7 @@ func calculateReadHotThreshold(stores *core.StoresInfo) uint64 { return hotRegionThreshold } -func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, v *core.RegionStat, kind FlowKind) (bool, *core.RegionStat) { +func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, oldItem *core.RegionStat, kind FlowKind) (bool, *core.RegionStat) { newItem := &core.RegionStat{ RegionID: region.GetId(), FlowBytes: flowBytes, @@ -133,27 +132,27 @@ func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes AntiCount: hotRegionAntiCount, } - if v != nil { - newItem.HotDegree = v.HotDegree + 1 + if oldItem != nil { + newItem.HotDegree = oldItem.HotDegree + 1 } if flowBytes >= hotRegionThreshold { - if v == nil { + if oldItem == nil { w.incMetrics("add_item", kind) } return true, newItem } // smaller than hotReionThreshold - if v == nil { + if oldItem == nil { return false, newItem } - if v.AntiCount <= 0 { + if oldItem.AntiCount <= 0 { w.incMetrics("remove_item", kind) return true, nil } // eliminate some noise - newItem.HotDegree = v.HotDegree - 1 - newItem.AntiCount = v.AntiCount - 1 - newItem.FlowBytes = v.FlowBytes + newItem.HotDegree = oldItem.HotDegree - 1 + newItem.AntiCount = oldItem.AntiCount - 1 + newItem.FlowBytes = oldItem.FlowBytes return true, newItem } From 06981e7bcc4ce5d904e0866592e499c906fe63d3 Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 19 Apr 2018 10:55:12 +0800 Subject: [PATCH 4/5] fix test --- server/schedulers/balance_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 193c3204a4d..cdda86fc9b9 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -986,7 +986,7 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { r := tc.RandHotRegionFromStore(2, schedule.ReadFlow) c.Assert(r, NotNil) c.Assert(r.GetId(), Equals, uint64(2)) - c.Assert(r.ReadBytes, Equals, uint64(512*1024)) + c.Assert(r.ReadBytes, Equals, uint64(512*1024*schedule.RegionHeartBeatReportInterval)) // check hot items stats := tc.HotCache.RegionStats(schedule.ReadFlow) c.Assert(len(stats), Equals, 3) From e2bbce821dfc122b332b8222b76df9715ed1da40 Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 24 Apr 2018 16:23:46 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E3=80=80address=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/schedule/metrics.go | 1 + server/schedulers/balance_test.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/server/schedule/metrics.go b/server/schedule/metrics.go index a9413da89dd..1cff894513a 100644 --- a/server/schedule/metrics.go +++ b/server/schedule/metrics.go @@ -32,6 +32,7 @@ var ( Help: "Bucketed histogram of processing time (s) of finished operator step.", Buckets: prometheus.ExponentialBuckets(0.01, 2, 16), }, []string{"type"}) + hotCacheStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index cdda86fc9b9..139d91d33ca 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -986,7 +986,6 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { r := tc.RandHotRegionFromStore(2, schedule.ReadFlow) c.Assert(r, NotNil) c.Assert(r.GetId(), Equals, uint64(2)) - c.Assert(r.ReadBytes, Equals, uint64(512*1024*schedule.RegionHeartBeatReportInterval)) // check hot items stats := tc.HotCache.RegionStats(schedule.ReadFlow) c.Assert(len(stats), Equals, 3)