diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 00000000000..815e04f5c54 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,93 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" +) + +// Cluster provides an overview of a cluster's basic information. +type Cluster interface { + GetHotStat() *statistics.HotStat + GetRegionStats() *statistics.RegionStatistics + GetLabelStats() *statistics.LabelStatistics + GetCoordinator() *schedule.Coordinator + GetRuleManager() *placement.RuleManager + GetBasicCluster() *core.BasicCluster +} + +// HandleStatsAsync handles the flow asynchronously. +func HandleStatsAsync(c Cluster, region *core.RegionInfo) { + checkWritePeerTask := func(cache *statistics.HotPeerCache) { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetWriteLoads(), interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + + checkExpiredTask := func(cache *statistics.HotPeerCache) { + expiredStats := cache.CollectExpiredItems(region) + for _, stat := range expiredStats { + cache.UpdateStat(stat) + } + } + + c.GetHotStat().CheckWriteAsync(checkExpiredTask) + c.GetHotStat().CheckReadAsync(checkExpiredTask) + c.GetHotStat().CheckWriteAsync(checkWritePeerTask) + c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) +} + +// HandleOverlaps handles the overlap regions. +func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo) { + for _, item := range overlaps { + select { + case <-ctx.Done(): + return + default: + } + if c.GetRegionStats() != nil { + c.GetRegionStats().ClearDefunctRegion(item.GetID()) + } + c.GetLabelStats().MarkDefunctRegion(item.GetID()) + c.GetRuleManager().InvalidCache(item.GetID()) + } +} + +// Collect collects the cluster information. +func Collect(ctx context.Context, c Cluster, region *core.RegionInfo) { + // get region again from root tree. make sure the observed region is the latest. + bc := c.GetBasicCluster() + if bc == nil { + return + } + region = bc.GetRegion(region.GetID()) + if region == nil { + return + } + select { + case <-ctx.Done(): + return + default: + } + c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region)) +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go new file mode 100644 index 00000000000..5885a9cdb84 --- /dev/null +++ b/pkg/mcs/scheduling/server/cluster.go @@ -0,0 +1,711 @@ +package server + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/ratelimit" + "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" +) + +// Cluster is used to manage all information for scheduling purpose. +type Cluster struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + *core.BasicCluster + persistConfig *config.PersistConfig + ruleManager *placement.RuleManager + labelerManager *labeler.RegionLabeler + regionStats *statistics.RegionStatistics + labelStats *statistics.LabelStatistics + hotStat *statistics.HotStat + storage storage.Storage + coordinator *schedule.Coordinator + checkMembershipCh chan struct{} + apiServerLeader atomic.Value + clusterID uint64 + running atomic.Bool + + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner +} + +const ( + regionLabelGCInterval = time.Hour + requestTimeout = 3 * time.Second + collectWaitTime = time.Minute + + // heartbeat relative const + heartbeatTaskRunner = "heartbeat-task-runner" + miscTaskRunner = "misc-task-runner" + logTaskRunner = "log-task-runner" +) + +var syncRunner = ratelimit.NewSyncRunner() + +// NewCluster creates a new cluster. +func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { + ctx, cancel := context.WithCancel(parentCtx) + labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) + if err != nil { + cancel() + return nil, err + } + ruleManager := placement.NewRuleManager(ctx, storage, basicCluster, persistConfig) + c := &Cluster{ + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + ruleManager: ruleManager, + labelerManager: labelerManager, + persistConfig: persistConfig, + hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), + regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), + storage: storage, + clusterID: clusterID, + checkMembershipCh: checkMembershipCh, + + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + } + c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + if err != nil { + cancel() + return nil, err + } + return c, nil +} + +// GetCoordinator returns the coordinator +func (c *Cluster) GetCoordinator() *schedule.Coordinator { + return c.coordinator +} + +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetStoresStats returns stores' statistics from cluster. +// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat +func (c *Cluster) GetStoresStats() *statistics.StoresStats { + return c.hotStat.StoresStats +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + +// GetBasicCluster returns the basic cluster. +func (c *Cluster) GetBasicCluster() *core.BasicCluster { + return c.BasicCluster +} + +// GetSharedConfig returns the shared config. +func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { + return c.persistConfig +} + +// GetRuleManager returns the rule manager. +func (c *Cluster) GetRuleManager() *placement.RuleManager { + return c.ruleManager +} + +// GetRegionLabeler returns the region labeler. +func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { + return c.labelerManager +} + +// GetRegionSplitter returns the region splitter. +func (c *Cluster) GetRegionSplitter() *splitter.RegionSplitter { + return c.coordinator.GetRegionSplitter() +} + +// GetRegionScatterer returns the region scatter. +func (c *Cluster) GetRegionScatterer() *scatter.RegionScatterer { + return c.coordinator.GetRegionScatterer() +} + +// GetStoresLoads returns load stats of all stores. +func (c *Cluster) GetStoresLoads() map[uint64][]float64 { + return c.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return c.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return c.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return c.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetStorage returns the storage. +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } + +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err + } + ctx, cancel := context.WithTimeout(c.ctx, requestTimeout) + defer cancel() + resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + if err != nil { + c.triggerMembershipCheck() + return 0, err + } + return resp.GetId(), nil +} + +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.triggerMembershipCheck() + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + +func (c *Cluster) triggerMembershipCheck() { + select { + case c.checkMembershipCh <- struct{}{}: + default: // avoid blocking + } +} + +// SwitchAPIServerLeader switches the API server leader. +func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { + old := c.apiServerLeader.Load() + return c.apiServerLeader.CompareAndSwap(old, new) +} + +func trySend(notifier chan struct{}) { + select { + case notifier <- struct{}{}: + // If the channel is not empty, it means the check is triggered. + default: + } +} + +// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) updateScheduler() { + defer logutil.LogPanic() + defer c.wg.Done() + + // Make sure the coordinator has initialized all the existing schedulers. + c.waitSchedulersInitialized() + // Establish a notifier to listen the schedulers updating. + notifier := make(chan struct{}, 1) + // Make sure the check will be triggered once later. + trySend(notifier) + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-notifier: + // This is triggered by the watcher when the schedulers are updated. + } + + if !c.running.Load() { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-ticker.C: + // retry + trySend(notifier) + continue + } + } + + log.Info("schedulers updating notifier is triggered, try to update the scheduler") + var ( + schedulersController = c.coordinator.GetSchedulersController() + latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers + ) + // Create the newly added schedulers. + for _, scheduler := range latestSchedulersConfig { + schedulerType := types.ConvertOldStrToType[scheduler.Type] + s, err := schedulers.CreateScheduler( + schedulerType, + c.coordinator.GetOperatorController(), + c.storage, + schedulers.ConfigSliceDecoder(schedulerType, scheduler.Args), + schedulersController.RemoveScheduler, + ) + if err != nil { + log.Error("failed to create scheduler", + zap.String("scheduler-type", scheduler.Type), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + name := s.GetName() + if existed, _ := schedulersController.IsSchedulerExisted(name); existed { + log.Info("scheduler has already existed, skip adding it", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + continue + } + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + log.Error("failed to add scheduler", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + log.Info("add scheduler successfully", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + } + // Remove the deleted schedulers. + for _, name := range schedulersController.GetSchedulerNames() { + scheduler := schedulersController.GetScheduler(name) + oldType := types.SchedulerTypeCompatibleMap[scheduler.GetType()] + if slice.AnyOf(latestSchedulersConfig, func(i int) bool { + return latestSchedulersConfig[i].Type == oldType + }) { + continue + } + if err := schedulersController.RemoveScheduler(name); err != nil { + log.Error("failed to remove scheduler", + zap.String("scheduler-name", name), + errs.ZapError(err)) + continue + } + log.Info("remove scheduler successfully", + zap.String("scheduler-name", name)) + } + } +} + +func (c *Cluster) waitSchedulersInitialized() { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + if c.coordinator.AreSchedulersInitialized() { + return + } + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop waiting the schedulers initialization") + return + case <-ticker.C: + } + } +} + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + } + c.labelStats.ClearDefunctRegions() +} + +func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// HandleStoreHeartbeat updates the store status. +func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { + stats := heartbeat.GetStats() + storeID := stats.GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("store %v not found", storeID) + } + + nowTime := time.Now() + newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + + if store := c.GetStore(storeID); store != nil { + statistics.UpdateStoreHeartbeatMetrics(store) + } + c.PutStore(newStore) + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + reportInterval := stats.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + checkReadPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(checkReadPeerTask) + } + + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckColdPeer(storeID, regions, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(collectUnReportedPeerTask) + return nil +} + +// runUpdateStoreStats updates store stats periodically. +func (c *Cluster) runUpdateStoreStats() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(9 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("update store stats background jobs has been stopped") + return + case <-ticker.C: + c.UpdateAllStoreStatus() + } + } +} + +// runCoordinator runs the main scheduling loop. +func (c *Cluster) runCoordinator() { + defer logutil.LogPanic() + defer c.wg.Done() + // force wait for 1 minute to make prepare checker won't be directly skipped + runCollectWaitTime := collectWaitTime + failpoint.Inject("changeRunCollectWaitTime", func() { + runCollectWaitTime = 1 * time.Second + }) + c.coordinator.RunUntilStop(runCollectWaitTime) +} + +func (c *Cluster) runMetricsCollectionJob() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("metrics are reset") + resetMetrics() + log.Info("metrics collection job has been stopped") + return + case <-ticker.C: + c.collectMetrics() + } + } +} + +func (c *Cluster) collectMetrics() { + statsMap := statistics.NewStoreStatisticsMap(c.persistConfig) + stores := c.GetStores() + for _, s := range stores { + statsMap.Observe(s) + statistics.ObserveHotStat(s, c.hotStat.StoresStats) + } + statsMap.Collect() + + c.coordinator.GetSchedulersController().CollectSchedulerMetrics() + c.coordinator.CollectHotSpotMetrics() + if c.regionStats == nil { + return + } + c.regionStats.Collect() + c.labelStats.Collect() + // collect hot cache metrics + c.hotStat.CollectMetrics() + // collect the lock metrics + c.CollectWaitLockMetrics() +} + +func resetMetrics() { + statistics.Reset() + schedulers.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() +} + +// StartBackgroundJobs starts background jobs. +func (c *Cluster) StartBackgroundJobs() { + c.wg.Add(4) + go c.updateScheduler() + go c.runUpdateStoreStats() + go c.runCoordinator() + go c.runMetricsCollectionJob() + c.heartbeatRunner.Start(c.ctx) + c.miscRunner.Start(c.ctx) + c.logRunner.Start(c.ctx) + c.running.Store(true) +} + +// StopBackgroundJobs stops background jobs. +func (c *Cluster) StopBackgroundJobs() { + if !c.running.Load() { + return + } + c.running.Store(false) + c.coordinator.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() + c.logRunner.Stop() + c.cancel() + c.wg.Wait() +} + +// IsBackgroundJobsRunning returns whether the background jobs are running. Only for test purpose. +func (c *Cluster) IsBackgroundJobsRunning() bool { + return c.running.Load() +} + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + tracer := core.NewNoopHeartbeatProcessTracer() + if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { + tracer = core.NewHeartbeatProcessTracer() + } + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner + if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner + logRunner = c.logRunner + } + ctx := &core.MetaProcessContext{ + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + MiscRunner: miscRunner, + LogRunner: logRunner, + } + tracer.Begin() + if err := c.processRegionHeartbeat(ctx, region); err != nil { + tracer.OnAllStageFinished() + return err + } + tracer.OnAllStageFinished() + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error { + tracer := ctx.Tracer + origin, _, err := c.PreCheckPutRegion(region) + tracer.OnPreCheckFinished() + if err != nil { + return err + } + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) + cluster.HandleStatsAsync(c, region) + tracer.OnAsyncHotStatsFinished() + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + regionID := region.GetID() + if !saveCache { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + ctx.TaskRunner.RunTask( + regionID, + ratelimit.ObserveRegionStatsAsync, + func(ctx context.Context) { + cluster.Collect(ctx, c, region) + }, + ) + } + // region is not updated to the subtree. + if origin.GetRef() < 2 { + ctx.TaskRunner.RunTask( + regionID, + ratelimit.UpdateSubTree, + func(context.Context) { + c.CheckAndPutSubTree(region) + }, + ratelimit.WithRetained(true), + ) + } + return nil + } + tracer.OnSaveCacheBegin() + var overlaps []*core.RegionInfo + if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However, it can't solve the race condition of concurrent heartbeats from the same region. + + // Async task in next PR. + if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil { + tracer.OnSaveCacheFinished() + return err + } + ctx.TaskRunner.RunTask( + regionID, + ratelimit.UpdateSubTree, + func(context.Context) { + c.CheckAndPutSubTree(region) + }, + ratelimit.WithRetained(retained), + ) + tracer.OnUpdateSubTreeFinished() + ctx.TaskRunner.RunTask( + regionID, + ratelimit.HandleOverlaps, + func(ctx context.Context) { + cluster.HandleOverlaps(ctx, c, overlaps) + }, + ) + } + tracer.OnSaveCacheFinished() + if hasRegionStats { + // handle region stats + ctx.TaskRunner.RunTask( + regionID, + ratelimit.CollectRegionStatsAsync, + func(ctx context.Context) { + cluster.Collect(ctx, c, region) + }, + ) + } + + tracer.OnCollectRegionStatsFinished() + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} + +// SetPrepared set the prepare check to prepared. Only for test purpose. +func (c *Cluster) SetPrepared() { + c.coordinator.GetPrepareChecker().SetPrepared() +} + +// IsSchedulingHalted returns whether the scheduling is halted. +// Currently, the microservice scheduling is halted when: +// - The `HaltScheduling` persist option is set to true. +func (c *Cluster) IsSchedulingHalted() bool { + return c.persistConfig.IsSchedulingHalted() +} diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go new file mode 100644 index 00000000000..7e51a8a7bdd --- /dev/null +++ b/pkg/statistics/region_collection.go @@ -0,0 +1,524 @@ +// Copyright 2018 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "time" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/syncutil" + "go.uber.org/zap" +) + +// RegionInfoProvider is an interface to provide the region information. +type RegionInfoProvider interface { + // GetRegion returns the region information according to the given region ID. + GetRegion(regionID uint64) *core.RegionInfo +} + +// RegionStatisticType represents the type of the region's status. +type RegionStatisticType uint16 + +const emptyStatistic = RegionStatisticType(0) + +// region status type +const ( + MissPeer RegionStatisticType = 1 << iota + ExtraPeer + DownPeer + PendingPeer + OfflinePeer + LearnerPeer + EmptyRegion + OversizedRegion + UndersizedRegion + WitnessLeader +) + +var regionStatisticTypes = []RegionStatisticType{ + MissPeer, + ExtraPeer, + DownPeer, + PendingPeer, + OfflinePeer, + LearnerPeer, + EmptyRegion, + OversizedRegion, + UndersizedRegion, + WitnessLeader, +} + +const nonIsolation = "none" + +var ( + // WithLabelValues is a heavy operation, define variable to avoid call it every time. + regionMissPeerRegionCounter = regionStatusGauge.WithLabelValues("miss-peer-region-count") + regionExtraPeerRegionCounter = regionStatusGauge.WithLabelValues("extra-peer-region-count") + regionDownPeerRegionCounter = regionStatusGauge.WithLabelValues("down-peer-region-count") + regionPendingPeerRegionCounter = regionStatusGauge.WithLabelValues("pending-peer-region-count") + regionOfflinePeerRegionCounter = regionStatusGauge.WithLabelValues("offline-peer-region-count") + regionLearnerPeerRegionCounter = regionStatusGauge.WithLabelValues("learner-peer-region-count") + regionEmptyRegionCounter = regionStatusGauge.WithLabelValues("empty-region-count") + regionOversizedRegionCounter = regionStatusGauge.WithLabelValues("oversized-region-count") + regionUndersizedRegionCounter = regionStatusGauge.WithLabelValues("undersized-region-count") + regionWitnessLeaderRegionCounter = regionStatusGauge.WithLabelValues("witness-leader-region-count") +) + +// RegionInfoWithTS is used to record the extra timestamp status of a region. +type RegionInfoWithTS struct { + startMissVoterPeerTS int64 + startDownPeerTS int64 +} + +// RegionStatistics is used to record the status of regions. +type RegionStatistics struct { + syncutil.RWMutex + rip RegionInfoProvider + conf sc.CheckerConfigProvider + stats map[RegionStatisticType]map[uint64]any + index map[uint64]RegionStatisticType + ruleManager *placement.RuleManager +} + +// NewRegionStatistics creates a new RegionStatistics. +func NewRegionStatistics( + rip RegionInfoProvider, + conf sc.CheckerConfigProvider, + ruleManager *placement.RuleManager, +) *RegionStatistics { + r := &RegionStatistics{ + rip: rip, + conf: conf, + ruleManager: ruleManager, + stats: make(map[RegionStatisticType]map[uint64]any), + index: make(map[uint64]RegionStatisticType), + } + for _, typ := range regionStatisticTypes { + r.stats[typ] = make(map[uint64]any) + } + return r +} + +// GetRegionStatsByType gets the status of the region by types. +// The regions here need to be cloned, otherwise, it may cause data race problems. +func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo { + r.RLock() + defer r.RUnlock() + res := make([]*core.RegionInfo, 0, len(r.stats[typ])) + for regionID := range r.stats[typ] { + region := r.rip.GetRegion(regionID) + if region == nil { + continue + } + res = append(res, region.Clone()) + } + return res +} + +// IsRegionStatsType returns whether the status of the region is the given type. +func (r *RegionStatistics) IsRegionStatsType(regionID uint64, typ RegionStatisticType) bool { + r.RLock() + defer r.RUnlock() + _, exist := r.stats[typ][regionID] + return exist +} + +func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID uint64) { + for typ := RegionStatisticType(1); typ <= deleteIndex; typ <<= 1 { + if deleteIndex&typ != 0 { + delete(r.stats[typ], regionID) + } + } +} + +// RegionStatsNeedUpdate checks whether the region's status need to be updated +// due to some special state types. +func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { + regionID := region.GetID() + if !r.isObserved(regionID) { + return true + } + if r.IsRegionStatsType(regionID, OversizedRegion) != + region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) { + return true + } + + if r.IsRegionStatsType(regionID, PendingPeer) != (len(region.GetPendingPeers()) != 0) { + return true + } + if r.IsRegionStatsType(regionID, DownPeer) != (len(region.GetDownPeers()) != 0) { + return true + } + if r.IsRegionStatsType(regionID, LearnerPeer) != (len(region.GetLearners()) != 0) { + return true + } + + // merge + return r.IsRegionStatsType(regionID, UndersizedRegion) != + region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys())) +} + +// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region. +func (r *RegionStatistics) isObserved(id uint64) bool { + r.RLock() + defer r.RUnlock() + _, ok := r.index[id] + return ok +} + +// Observe records the current regions' status. +func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) { + r.Lock() + defer r.Unlock() + var ( + desiredReplicas = r.conf.GetMaxReplicas() + desiredVoters = desiredReplicas + peerTypeIndex RegionStatisticType + ) + // Check if the region meets count requirements of its rules. + if r.conf.IsPlacementRulesEnabled() { + if !r.ruleManager.IsInitialized() { + log.Warn("ruleManager haven't been initialized") + return + } + desiredReplicas = 0 + desiredVoters = 0 + rules := r.ruleManager.GetRulesForApplyRegion(region) + for _, rule := range rules { + desiredReplicas += rule.Count + if rule.Role != placement.Learner { + desiredVoters += rule.Count + } + } + } + + peers := region.GetPeers() + downPeers := region.GetDownPeers() + pendingPeers := region.GetPendingPeers() + learners := region.GetLearners() + voters := region.GetVoters() + regionSize := region.GetApproximateSize() + regionMaxSize := int64(r.conf.GetRegionMaxSize()) + regionMaxKeys := int64(r.conf.GetRegionMaxKeys()) + maxMergeRegionSize := int64(r.conf.GetMaxMergeRegionSize()) + maxMergeRegionKeys := int64(r.conf.GetMaxMergeRegionKeys()) + leaderIsWitness := region.GetLeader().GetIsWitness() + + // Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`. + // Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP. + // For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated. + var conditions RegionStatisticType + if len(peers) < desiredReplicas { + conditions |= MissPeer + } + if len(peers) > desiredReplicas { + conditions |= ExtraPeer + } + if len(downPeers) > 0 { + conditions |= DownPeer + } + if len(pendingPeers) > 0 { + conditions |= PendingPeer + } + for _, store := range stores { + if store.IsRemoving() { + peer := region.GetStorePeer(store.GetID()) + if peer != nil { + conditions |= OfflinePeer + break + } + } + } + if len(learners) > 0 { + conditions |= LearnerPeer + } + if regionSize <= core.EmptyRegionApproximateSize { + conditions |= EmptyRegion + } + if region.IsOversized(regionMaxSize, regionMaxKeys) { + conditions |= OversizedRegion + } + if region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys) { + conditions |= UndersizedRegion + } + if leaderIsWitness { + conditions |= WitnessLeader + } + // Check if the region meets any of the conditions and update the corresponding info. + regionID := region.GetID() + for i := 0; i < len(regionStatisticTypes); i++ { + condition := RegionStatisticType(1 << i) + if conditions&condition == 0 { + continue + } + info := r.stats[condition][regionID] + // The condition is met + switch condition { + case MissPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if len(voters) < desiredVoters { + if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 { + regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS)) + } else { + info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix() + } + } + case DownPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if info.(*RegionInfoWithTS).startDownPeerTS != 0 { + regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS)) + } else { + info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix() + logDownPeerWithNoDisconnectedStore(region, stores) + } + case ExtraPeer: + fallthrough + case PendingPeer: + fallthrough + case OfflinePeer: + fallthrough + case LearnerPeer: + fallthrough + case EmptyRegion: + fallthrough + case OversizedRegion: + fallthrough + case UndersizedRegion: + fallthrough + case WitnessLeader: + info = struct{}{} + } + r.stats[condition][regionID] = info + peerTypeIndex |= condition + } + // Remove the info if any of the conditions are not met any more. + if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic { + deleteIndex := oldIndex &^ peerTypeIndex + r.deleteEntry(deleteIndex, regionID) + } + r.index[regionID] = peerTypeIndex +} + +// ClearDefunctRegion is used to handle the overlap region. +func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) { + r.Lock() + defer r.Unlock() + if oldIndex, ok := r.index[regionID]; ok { + delete(r.index, regionID) + if oldIndex > emptyStatistic { + r.deleteEntry(oldIndex, regionID) + } + } +} + +// Collect collects the metrics of the regions' status. +func (r *RegionStatistics) Collect() { + r.RLock() + defer r.RUnlock() + regionMissPeerRegionCounter.Set(float64(len(r.stats[MissPeer]))) + regionExtraPeerRegionCounter.Set(float64(len(r.stats[ExtraPeer]))) + regionDownPeerRegionCounter.Set(float64(len(r.stats[DownPeer]))) + regionPendingPeerRegionCounter.Set(float64(len(r.stats[PendingPeer]))) + regionOfflinePeerRegionCounter.Set(float64(len(r.stats[OfflinePeer]))) + regionLearnerPeerRegionCounter.Set(float64(len(r.stats[LearnerPeer]))) + regionEmptyRegionCounter.Set(float64(len(r.stats[EmptyRegion]))) + regionOversizedRegionCounter.Set(float64(len(r.stats[OversizedRegion]))) + regionUndersizedRegionCounter.Set(float64(len(r.stats[UndersizedRegion]))) + regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader]))) +} + +// ResetRegionStatsMetrics resets the metrics of the regions' status. +func ResetRegionStatsMetrics() { + regionMissPeerRegionCounter.Set(0) + regionExtraPeerRegionCounter.Set(0) + regionDownPeerRegionCounter.Set(0) + regionPendingPeerRegionCounter.Set(0) + regionOfflinePeerRegionCounter.Set(0) + regionLearnerPeerRegionCounter.Set(0) + regionEmptyRegionCounter.Set(0) + regionOversizedRegionCounter.Set(0) + regionUndersizedRegionCounter.Set(0) + regionWitnessLeaderRegionCounter.Set(0) +} + +// LabelStatistics is the statistics of the level of labels. +type LabelStatistics struct { + syncutil.RWMutex + regionLabelStats map[uint64]string + labelCounter map[string]int + defunctRegions map[uint64]struct{} +} + +// NewLabelStatistics creates a new LabelStatistics. +func NewLabelStatistics() *LabelStatistics { + return &LabelStatistics{ + regionLabelStats: make(map[uint64]string), + labelCounter: make(map[string]int), + defunctRegions: make(map[uint64]struct{}), + } +} + +// Observe records the current label status. +func (l *LabelStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo, labels []string) { + regionID := region.GetID() + regionIsolation := GetRegionLabelIsolation(stores, labels) + l.Lock() + defer l.Unlock() + if label, ok := l.regionLabelStats[regionID]; ok { + if label == regionIsolation { + return + } + l.labelCounter[label]-- + } + l.regionLabelStats[regionID] = regionIsolation + l.labelCounter[regionIsolation]++ +} + +// Collect collects the metrics of the label status. +func (l *LabelStatistics) Collect() { + l.RLock() + defer l.RUnlock() + for level, count := range l.labelCounter { + regionLabelLevelGauge.WithLabelValues(level).Set(float64(count)) + } +} + +// ResetLabelStatsMetrics resets the metrics of the label status. +func ResetLabelStatsMetrics() { + regionLabelLevelGauge.Reset() +} + +// MarkDefunctRegion is used to handle the overlap region. +// It is used to mark the region as defunct and remove it from the label statistics later. +func (l *LabelStatistics) MarkDefunctRegion(regionID uint64) { + l.Lock() + defer l.Unlock() + l.defunctRegions[regionID] = struct{}{} +} + +// ClearDefunctRegions is used to handle the overlap region. +// It is used to remove the defunct regions from the label statistics. +func (l *LabelStatistics) ClearDefunctRegions() { + l.Lock() + defer l.Unlock() + for regionID := range l.defunctRegions { + if label, ok := l.regionLabelStats[regionID]; ok { + l.labelCounter[label]-- + delete(l.regionLabelStats, regionID) + } + } + l.defunctRegions = make(map[uint64]struct{}) +} + +// GetLabelCounter is only used for tests. +func (l *LabelStatistics) GetLabelCounter() map[string]int { + l.RLock() + defer l.RUnlock() + clonedLabelCounter := make(map[string]int, len(l.labelCounter)) + for k, v := range l.labelCounter { + clonedLabelCounter[k] = v + } + return clonedLabelCounter +} + +// GetRegionLabelIsolation returns the isolation level of the region. +func GetRegionLabelIsolation(stores []*core.StoreInfo, labels []string) string { + if len(stores) == 0 || len(labels) == 0 { + return nonIsolation + } + queueStores := [][]*core.StoreInfo{stores} + for level, label := range labels { + newQueueStores := make([][]*core.StoreInfo, 0, len(stores)) + for _, stores := range queueStores { + notIsolatedStores := notIsolatedStoresWithLabel(stores, label) + if len(notIsolatedStores) > 0 { + newQueueStores = append(newQueueStores, notIsolatedStores...) + } + } + queueStores = newQueueStores + if len(queueStores) == 0 { + return labels[level] + } + } + return nonIsolation +} + +func notIsolatedStoresWithLabel(stores []*core.StoreInfo, label string) [][]*core.StoreInfo { + var emptyValueStores []*core.StoreInfo + valueStoresMap := make(map[string][]*core.StoreInfo) + + for _, s := range stores { + labelValue := s.GetLabelValue(label) + if labelValue == "" { + emptyValueStores = append(emptyValueStores, s) + } else { + valueStoresMap[labelValue] = append(valueStoresMap[labelValue], s) + } + } + + if len(valueStoresMap) == 0 { + // Usually it is because all TiKVs lack this label. + if len(emptyValueStores) > 1 { + return [][]*core.StoreInfo{emptyValueStores} + } + return nil + } + + var res [][]*core.StoreInfo + if len(emptyValueStores) == 0 { + // No TiKV lacks this label. + for _, stores := range valueStoresMap { + if len(stores) > 1 { + res = append(res, stores) + } + } + } else { + // Usually it is because some TiKVs lack this label. + // The TiKVs in each label and the TiKVs without label form a group. + for _, stores := range valueStoresMap { + stores = append(stores, emptyValueStores...) + res = append(res, stores) + } + } + return res +} + +// logDownPeerWithNoDisconnectedStore logs down peers on connected stores. +// It won't log down peer when any store of the replica is disconnected which is +// used to avoid too many logs when a store is disconnected. +// TODO: it's not a good way to log down peer during process region heartbeat, we should handle it in another way. +// region: the region which has down peer +// stores: all stores that the region has peer on them +func logDownPeerWithNoDisconnectedStore(region *core.RegionInfo, stores []*core.StoreInfo) { + for _, store := range stores { + if store.IsDisconnected() { + return + } + } + for _, p := range region.GetDownPeers() { + log.Warn("region has down peer on connected store", + zap.Uint64("region-id", region.GetID()), + zap.Uint64("down-peer", p.GetPeer().GetId()), + zap.Uint64("down-seconds", p.GetDownSeconds()), + zap.Uint64("store-id", p.GetPeer().GetStoreId())) + } +} diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index f3365b1665e..09de008dec9 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1081,7 +1081,12 @@ func TestRegionLabelIsolationLevel(t *testing.T) { cfg.LocationLabels = []string{"zone"} opt.SetReplicationConfig(cfg) re.NoError(err) +<<<<<<< HEAD cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) +======= + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) +>>>>>>> 26123dc75 (checker, statistic: avoid leak in label statistic (#8703)) for i := uint64(1); i <= 4; i++ { var labels []*metapb.StoreLabel @@ -1116,11 +1121,45 @@ func TestRegionLabelIsolationLevel(t *testing.T) { StartKey: []byte{byte(1)}, EndKey: []byte{byte(2)}, } - r := core.NewRegionInfo(region, peers[0]) - re.NoError(cluster.putRegion(r)) + r1 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r1)) +<<<<<<< HEAD cluster.updateRegionsLabelLevelStats([]*core.RegionInfo{r}) counter := cluster.labelLevelStats.GetLabelCounter() +======= + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1}) + counter := cluster.labelStats.GetLabelCounter() +>>>>>>> 26123dc75 (checker, statistic: avoid leak in label statistic (#8703)) + re.Equal(0, counter["none"]) + re.Equal(1, counter["zone"]) + + region = &metapb.Region{ + Id: 10, + Peers: peers, + StartKey: []byte{byte(2)}, + EndKey: []byte{byte(3)}, + } + r2 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r2)) + + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r2}) + counter = cluster.labelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(2, counter["zone"]) + + // issue: https://github.com/tikv/pd/issues/8700 + // step1: heartbeat a overlap region, which is used to simulate the case that the region is merged. + // step2: update region 9 and region 10, which is used to simulate the case that patrol is triggered. + // We should only count region 9. + overlapRegion := r1.Clone( + core.WithStartKey(r1.GetStartKey()), + core.WithEndKey(r2.GetEndKey()), + core.WithLeader(r2.GetPeer(8)), + ) + re.NoError(cluster.HandleRegionHeartbeat(overlapRegion)) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1, r2}) + counter = cluster.labelStats.GetLabelCounter() re.Equal(0, counter["none"]) re.Equal(1, counter["zone"]) } diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go new file mode 100644 index 00000000000..8578b3480d8 --- /dev/null +++ b/server/cluster/scheduling_controller.go @@ -0,0 +1,492 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/checker" + sc "github.com/tikv/pd/pkg/schedule/config" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" +) + +// schedulingController is used to manage all schedulers and checkers. +type schedulingController struct { + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + mu syncutil.RWMutex + wg sync.WaitGroup + *core.BasicCluster + opt sc.ConfProvider + coordinator *schedule.Coordinator + labelStats *statistics.LabelStatistics + regionStats *statistics.RegionStatistics + hotStat *statistics.HotStat + slowStat *statistics.SlowStat + running bool +} + +// newSchedulingController creates a new scheduling controller. +func newSchedulingController(parentCtx context.Context, basicCluster *core.BasicCluster, opt sc.ConfProvider, ruleManager *placement.RuleManager) *schedulingController { + ctx, cancel := context.WithCancel(parentCtx) + return &schedulingController{ + parentCtx: parentCtx, + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + opt: opt, + labelStats: statistics.NewLabelStatistics(), + hotStat: statistics.NewHotStat(parentCtx), + slowStat: statistics.NewSlowStat(), + regionStats: statistics.NewRegionStatistics(basicCluster, opt, ruleManager), + } +} + +func (sc *schedulingController) stopSchedulingJobs() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + if !sc.running { + return false + } + sc.coordinator.Stop() + sc.cancel() + sc.wg.Wait() + sc.running = false + log.Info("scheduling service is stopped") + return true +} + +func (sc *schedulingController) startSchedulingJobs(cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.mu.Lock() + defer sc.mu.Unlock() + if sc.running { + return + } + sc.initCoordinatorLocked(sc.parentCtx, cluster, hbstreams) + sc.wg.Add(3) + go sc.runCoordinator() + go sc.runStatsBackgroundJobs() + go sc.runSchedulingMetricsCollectionJob() + sc.running = true + log.Info("scheduling service is started") +} + +func (sc *schedulingController) initCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.initCoordinatorLocked(ctx, cluster, hbstreams) + sc.coordinator.InitSchedulers(false) +} + +func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.ctx, sc.cancel = context.WithCancel(ctx) + sc.coordinator = schedule.NewCoordinator(sc.ctx, cluster, hbstreams) +} + +// runCoordinator runs the main scheduling loop. +func (sc *schedulingController) runCoordinator() { + defer logutil.LogPanic() + defer sc.wg.Done() + sc.coordinator.RunUntilStop() +} + +func (sc *schedulingController) runStatsBackgroundJobs() { + defer logutil.LogPanic() + defer sc.wg.Done() + + ticker := time.NewTicker(statistics.RegionsStatsObserveInterval) + defer ticker.Stop() + + for _, store := range sc.GetStores() { + storeID := store.GetID() + sc.hotStat.GetOrCreateRollingStoreStats(storeID) + } + for { + select { + case <-sc.ctx.Done(): + log.Info("statistics background jobs has been stopped") + return + case <-ticker.C: + sc.hotStat.ObserveRegionsStats(sc.GetStoresWriteRate()) + } + } +} + +func (sc *schedulingController) runSchedulingMetricsCollectionJob() { + defer logutil.LogPanic() + defer sc.wg.Done() + + ticker := time.NewTicker(metricsCollectionJobInterval) + failpoint.Inject("highFrequencyClusterJobs", func() { + ticker.Reset(time.Millisecond) + }) + defer ticker.Stop() + + for { + select { + case <-sc.ctx.Done(): + log.Info("scheduling metrics are reset") + resetSchedulingMetrics() + log.Info("scheduling metrics collection job has been stopped") + return + case <-ticker.C: + sc.collectSchedulingMetrics() + } + } +} + +func resetSchedulingMetrics() { + statistics.Reset() + schedulers.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() + statistics.ResetRegionStatsMetrics() + statistics.ResetLabelStatsMetrics() + // reset hot cache metrics + statistics.ResetHotCacheStatusMetrics() +} + +func (sc *schedulingController) collectSchedulingMetrics() { + statsMap := statistics.NewStoreStatisticsMap(sc.opt) + stores := sc.GetStores() + for _, s := range stores { + statsMap.Observe(s) + statistics.ObserveHotStat(s, sc.hotStat.StoresStats) + } + statsMap.Collect() + sc.coordinator.GetSchedulersController().CollectSchedulerMetrics() + sc.coordinator.CollectHotSpotMetrics() + if sc.regionStats == nil { + return + } + sc.regionStats.Collect() + sc.labelStats.Collect() + // collect hot cache metrics + sc.hotStat.CollectMetrics() + // collect the lock metrics + sc.CollectWaitLockMetrics() +} + +func (sc *schedulingController) removeStoreStatistics(storeID uint64) { + sc.hotStat.RemoveRollingStoreStats(storeID) + sc.slowStat.RemoveSlowStoreStatus(storeID) +} + +func (sc *schedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { + sc.hotStat.GetOrCreateRollingStoreStats(storeID) + sc.slowStat.ObserveSlowStoreStatus(storeID, isSlow) +} + +// GetHotStat gets hot stat. +func (sc *schedulingController) GetHotStat() *statistics.HotStat { + return sc.hotStat +} + +// GetRegionStats gets region statistics. +func (sc *schedulingController) GetRegionStats() *statistics.RegionStatistics { + return sc.regionStats +} + +// GetLabelStats gets label statistics. +func (sc *schedulingController) GetLabelStats() *statistics.LabelStatistics { + return sc.labelStats +} + +// GetRegionStatsByType gets the status of the region by types. +func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { + if sc.regionStats == nil { + return nil + } + return sc.regionStats.GetRegionStatsByType(typ) +} + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) + } + sc.labelStats.ClearDefunctRegions() +} + +func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := sc.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// GetStoresStats returns stores' statistics from cluster. +// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat +func (sc *schedulingController) GetStoresStats() *statistics.StoresStats { + return sc.hotStat.StoresStats +} + +// GetStoresLoads returns load stats of all stores. +func (sc *schedulingController) GetStoresLoads() map[uint64][]float64 { + return sc.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (sc *schedulingController) IsRegionHot(region *core.RegionInfo) bool { + return sc.hotStat.IsRegionHot(region, sc.opt.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := sc.opt.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return sc.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return sc.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetCoordinator returns the coordinator. +func (sc *schedulingController) GetCoordinator() *schedule.Coordinator { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator +} + +// GetPausedSchedulerDelayAt returns DelayAt of a paused scheduler +func (sc *schedulingController) GetPausedSchedulerDelayAt(name string) (int64, error) { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayAt(name) +} + +// GetPausedSchedulerDelayUntil returns DelayUntil of a paused scheduler +func (sc *schedulingController) GetPausedSchedulerDelayUntil(name string) (int64, error) { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayUntil(name) +} + +// GetOperatorController returns the operator controller. +func (sc *schedulingController) GetOperatorController() *operator.Controller { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetOperatorController() +} + +// GetRegionScatterer returns the region scatter. +func (sc *schedulingController) GetRegionScatterer() *scatter.RegionScatterer { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetRegionScatterer() +} + +// GetRegionSplitter returns the region splitter +func (sc *schedulingController) GetRegionSplitter() *splitter.RegionSplitter { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetRegionSplitter() +} + +// GetMergeChecker returns merge checker. +func (sc *schedulingController) GetMergeChecker() *checker.MergeChecker { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetMergeChecker() +} + +// GetRuleChecker returns rule checker. +func (sc *schedulingController) GetRuleChecker() *checker.RuleChecker { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetRuleChecker() +} + +// GetSchedulers gets all schedulers. +func (sc *schedulingController) GetSchedulers() []string { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetSchedulerNames() +} + +// GetSchedulerHandlers gets all scheduler handlers. +func (sc *schedulingController) GetSchedulerHandlers() map[string]http.Handler { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetSchedulerHandlers() +} + +// AddSchedulerHandler adds a scheduler handler. +func (sc *schedulingController) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) +} + +// RemoveSchedulerHandler removes a scheduler handler. +func (sc *schedulingController) RemoveSchedulerHandler(name string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().RemoveSchedulerHandler(name) +} + +// AddScheduler adds a scheduler. +func (sc *schedulingController) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) +} + +// RemoveScheduler removes a scheduler. +func (sc *schedulingController) RemoveScheduler(name string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().RemoveScheduler(name) +} + +// PauseOrResumeScheduler pauses or resumes a scheduler. +func (sc *schedulingController) PauseOrResumeScheduler(name string, t int64) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().PauseOrResumeScheduler(name, t) +} + +// PauseOrResumeChecker pauses or resumes checker. +func (sc *schedulingController) PauseOrResumeChecker(name string, t int64) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.PauseOrResumeChecker(name, t) +} + +// AddPendingProcessedRegions adds regions to suspect list. +func (sc *schedulingController) AddPendingProcessedRegions(needCheckLen bool, regionIDs ...uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().AddPendingProcessedRegions(needCheckLen, regionIDs...) +} + +// GetPendingProcessedRegions gets all suspect regions. +func (sc *schedulingController) GetPendingProcessedRegions() []uint64 { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetCheckerController().GetPendingProcessedRegions() +} + +// RemovePendingProcessedRegion removes region from pending processed regions. +func (sc *schedulingController) RemovePendingProcessedRegion(id uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().RemovePendingProcessedRegion(id) +} + +// PopOneSuspectKeyRange gets one suspect keyRange group. +// it would return value and true if pop success, or return empty [][2][]byte and false +// if suspectKeyRanges couldn't pop keyRange group. +func (sc *schedulingController) PopOneSuspectKeyRange() ([2][]byte, bool) { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetCheckerController().PopOneSuspectKeyRange() +} + +// ClearSuspectKeyRanges clears the suspect keyRanges, only for unit test +func (sc *schedulingController) ClearSuspectKeyRanges() { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().ClearSuspectKeyRanges() +} + +// AddSuspectKeyRange adds the key range with the its ruleID as the key +// The instance of each keyRange is like following format: +// [2][]byte: start key/end key +func (sc *schedulingController) AddSuspectKeyRange(start, end []byte) { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().AddSuspectKeyRange(start, end) +} + +func (sc *schedulingController) getEvictLeaderStores() (evictStores []uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() + if sc.coordinator == nil { + return nil + } + handler, ok := sc.coordinator.GetSchedulersController().GetSchedulerHandlers()[types.EvictLeaderScheduler.String()] + if !ok { + return + } + type evictLeaderHandler interface { + EvictStoreIDs() []uint64 + } + h, ok := handler.(evictLeaderHandler) + if !ok { + return + } + return h.EvictStoreIDs() +} + +// IsPrepared return true if the prepare checker is ready. +func (sc *schedulingController) IsPrepared() bool { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetPrepareChecker().IsPrepared() +} + +// SetPrepared set the prepare check to prepared. Only for test purpose. +func (sc *schedulingController) SetPrepared() { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetPrepareChecker().SetPrepared() +} + +// IsSchedulingControllerRunning returns whether the scheduling controller is running. Only for test purpose. +func (sc *schedulingController) IsSchedulingControllerRunning() bool { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.running +}