Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add metrics for hotspot cache #1027

Merged
merged 6 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ func (c *coordinator) collectHotSpotMetrics() {
hotSpotStatusGauge.WithLabelValues(store, "hot_read_region_as_leader").Set(0)
}
}

// collect hot cache metrics
c.cluster.HotCache.CollectMetrics(c.cluster.Stores)
}

func (c *coordinator) shouldRun() bool {
Expand Down
118 changes: 67 additions & 51 deletions server/schedule/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,47 +45,71 @@ func newHotSpotCache() *HotSpotCache {

// CheckWrite checks the write status, returns whether need update statistics and item.
func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) {
var WrittenBytesPerSec uint64
var (
WrittenBytesPerSec uint64
value *core.RegionStat
)
v, isExist := w.writeFlow.Peek(region.GetId())
if isExist && !Simulating {
interval := time.Since(v.(*core.RegionStat).LastUpdateTime).Seconds()
value = v.(*core.RegionStat)
interval := time.Since(value.LastUpdateTime).Seconds()
if interval < minHotRegionReportInterval {
return false, nil
}
WrittenBytesPerSec = uint64(float64(region.WrittenBytes) / interval)
} else {
WrittenBytesPerSec = uint64(float64(region.WrittenBytes) / float64(RegionHeartBeatReportInterval))
}
region.WrittenBytes = WrittenBytesPerSec

// hotRegionThreshold is use to pick hot region
// suppose the number of the hot Regions is statCacheMaxLen
// and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions
// divide 2 because the store reports data about two times than the region record write to rocksdb
divisor := float64(statCacheMaxLen) * 2 * storeHeartBeatReportInterval
hotRegionThreshold := uint64(float64(stores.TotalWrittenBytes()) / divisor)

if hotRegionThreshold < hotWriteRegionMinFlowRate {
hotRegionThreshold = hotWriteRegionMinFlowRate
}
return w.isNeedUpdateStatCache(region, hotRegionThreshold, WriteFlow)
hotRegionThreshold := calculateWriteHotThreshold(stores)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing region.WrittenBytes = WrittenBytesPerSec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary.

return w.isNeedUpdateStatCache(region, WrittenBytesPerSec, hotRegionThreshold, value, WriteFlow)
}

// CheckRead checks the read status, returns whether need update statistics and item.
func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) {
var ReadBytesPerSec uint64
var (
ReadBytesPerSec uint64
value *core.RegionStat
)
v, isExist := w.readFlow.Peek(region.GetId())
if isExist && !Simulating {
interval := time.Since(v.(*core.RegionStat).LastUpdateTime).Seconds()
value = v.(*core.RegionStat)
interval := time.Since(value.LastUpdateTime).Seconds()
if interval < minHotRegionReportInterval {
return false, nil
}
ReadBytesPerSec = uint64(float64(region.ReadBytes) / interval)
} else {
ReadBytesPerSec = uint64(float64(region.ReadBytes) / float64(RegionHeartBeatReportInterval))
}
region.ReadBytes = ReadBytesPerSec
hotRegionThreshold := calculateReadHotThreshold(stores)
return w.isNeedUpdateStatCache(region, ReadBytesPerSec, hotRegionThreshold, value, ReadFlow)
}

func (w *HotSpotCache) incMetrics(name string, kind FlowKind) {
switch kind {
case WriteFlow:
hotCacheStatusGauge.WithLabelValues(name, "write").Inc()
case ReadFlow:
hotCacheStatusGauge.WithLabelValues(name, "read").Inc()
}
}

func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 {
// hotRegionThreshold is use to pick hot region
// suppose the number of the hot Regions is statCacheMaxLen
// and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions
// divide 2 because the store reports data about two times than the region record write to rocksdb
divisor := float64(statCacheMaxLen) * 2 * storeHeartBeatReportInterval
hotRegionThreshold := uint64(float64(stores.TotalWrittenBytes()) / divisor)

if hotRegionThreshold < hotWriteRegionMinFlowRate {
hotRegionThreshold = hotWriteRegionMinFlowRate
}
return hotRegionThreshold
}

func calculateReadHotThreshold(stores *core.StoresInfo) uint64 {
// hotRegionThreshold is use to pick hot region
// suppose the number of the hot Regions is statLRUMaxLen
// and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions
Expand All @@ -95,26 +119,10 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf
if hotRegionThreshold < hotReadRegionMinFlowRate {
hotRegionThreshold = hotReadRegionMinFlowRate
}
return w.isNeedUpdateStatCache(region, hotRegionThreshold, ReadFlow)
return hotRegionThreshold
}

func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, hotRegionThreshold uint64, kind FlowKind) (bool, *core.RegionStat) {
var (
v *core.RegionStat
value interface{}
isExist bool
flowBytes uint64
)
key := region.GetId()

switch kind {
case WriteFlow:
value, isExist = w.writeFlow.Peek(key)
flowBytes = region.WrittenBytes
case ReadFlow:
value, isExist = w.readFlow.Peek(key)
flowBytes = region.ReadBytes
}
func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, oldItem *core.RegionStat, kind FlowKind) (bool, *core.RegionStat) {
newItem := &core.RegionStat{
RegionID: region.GetId(),
FlowBytes: flowBytes,
Expand All @@ -124,31 +132,27 @@ func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, hotRegionT
AntiCount: hotRegionAntiCount,
}

if isExist {
v = value.(*core.RegionStat)
newItem.HotDegree = v.HotDegree + 1
if oldItem != nil {
newItem.HotDegree = oldItem.HotDegree + 1
}
switch kind {
case WriteFlow:
if region.WrittenBytes >= hotRegionThreshold {
return true, newItem
}
case ReadFlow:
if region.ReadBytes >= hotRegionThreshold {
return true, newItem
if flowBytes >= hotRegionThreshold {
if oldItem == nil {
w.incMetrics("add_item", kind)
}
return true, newItem
}
// smaller than hotReionThreshold
if !isExist {
if oldItem == nil {
return false, newItem
}
if v.AntiCount <= 0 {
if oldItem.AntiCount <= 0 {
w.incMetrics("remove_item", kind)
return true, nil
}
// eliminate some noise
newItem.HotDegree = v.HotDegree - 1
newItem.AntiCount = v.AntiCount - 1
newItem.FlowBytes = v.FlowBytes
newItem.HotDegree = oldItem.HotDegree - 1
newItem.AntiCount = oldItem.AntiCount - 1
newItem.FlowBytes = oldItem.FlowBytes
return true, newItem
}

Expand All @@ -160,12 +164,14 @@ func (w *HotSpotCache) Update(key uint64, item *core.RegionStat, kind FlowKind)
w.writeFlow.Remove(key)
} else {
w.writeFlow.Put(key, item)
w.incMetrics("update_item", kind)
}
case ReadFlow:
if item == nil {
w.readFlow.Remove(key)
} else {
w.readFlow.Put(key, item)
w.incMetrics("update_item", kind)
}
}
}
Expand Down Expand Up @@ -197,6 +203,16 @@ func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hot
return nil
}

// CollectMetrics collect the hot cache metrics
func (w *HotSpotCache) CollectMetrics(stores *core.StoresInfo) {
hotCacheStatusGauge.WithLabelValues("total_length", "write").Set(float64(w.writeFlow.Len()))
hotCacheStatusGauge.WithLabelValues("total_length", "read").Set(float64(w.readFlow.Len()))
threshold := calculateWriteHotThreshold(stores)
hotCacheStatusGauge.WithLabelValues("hotThreshold", "write").Set(float64(threshold))
threshold = calculateReadHotThreshold(stores)
hotCacheStatusGauge.WithLabelValues("hotThreshold", "read").Set(float64(threshold))
}

func (w *HotSpotCache) isRegionHot(id uint64, hotThreshold int) bool {
if stat, ok := w.writeFlow.Peek(id); ok {
if stat.(*core.RegionStat).HotDegree >= hotThreshold {
Expand Down
9 changes: 9 additions & 0 deletions server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,18 @@ var (
Help: "Bucketed histogram of processing time (s) of finished operator step.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 16),
}, []string{"type"})

hotCacheStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "hotcache",
Name: "status",
Help: "Status of the hotspot.",
}, []string{"name", "type"})
)

func init() {
prometheus.MustRegister(checkerCounter)
prometheus.MustRegister(operatorStepDuration)
prometheus.MustRegister(hotCacheStatusGauge)
}
1 change: 0 additions & 1 deletion server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,6 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) {
r := tc.RandHotRegionFromStore(2, schedule.ReadFlow)
c.Assert(r, NotNil)
c.Assert(r.GetId(), Equals, uint64(2))
c.Assert(r.ReadBytes, Equals, uint64(512*1024))
// check hot items
stats := tc.HotCache.RegionStats(schedule.ReadFlow)
c.Assert(len(stats), Equals, 3)
Expand Down