diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 475b0a34939..f8114720d2f 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -185,38 +185,38 @@ func (c *Config) Validate(log log.Logger) error { // Cortex is the root datastructure for Cortex. type Cortex struct { - target ModuleName + Cfg Config // set during initialization - serviceMap map[ModuleName]services.Service - - api *api.API - server *server.Server - ring *ring.Ring - overrides *validation.Overrides - distributor *distributor.Distributor - ingester *ingester.Ingester - flusher *flusher.Flusher - store chunk.Store - deletesStore *purger.DeleteStore - frontend *frontend.Frontend - tableManager *chunk.TableManager - cache cache.Cache - runtimeConfig *runtimeconfig.Manager - dataPurger *purger.DataPurger - tombstonesLoader *purger.TombstonesLoader - - ruler *ruler.Ruler - configAPI *configAPI.API - configDB db.DB - alertmanager *alertmanager.MultitenantAlertmanager - compactor *compactor.Compactor - storeGateway *storegateway.StoreGateway - memberlistKV *memberlist.KVInit + ServiceMap map[ModuleName]services.Service + + API *api.API + Server *server.Server + Ring *ring.Ring + Overrides *validation.Overrides + Distributor *distributor.Distributor + Ingester *ingester.Ingester + Flusher *flusher.Flusher + Store chunk.Store + DeletesStore *purger.DeleteStore + Frontend *frontend.Frontend + TableManager *chunk.TableManager + Cache cache.Cache + RuntimeConfig *runtimeconfig.Manager + DataPurger *purger.DataPurger + TombstonesLoader *purger.TombstonesLoader + + Ruler *ruler.Ruler + ConfigAPI *configAPI.API + ConfigDB db.DB + Alertmanager *alertmanager.MultitenantAlertmanager + Compactor *compactor.Compactor + StoreGateway *storegateway.StoreGateway + MemberlistKV *memberlist.KVInit // Queryable that the querier should use to query the long // term storage. It depends on the storage engine used. - storeQueryable prom_storage.Queryable + StoreQueryable prom_storage.Queryable } // New makes a new Cortex. @@ -229,28 +229,28 @@ func New(cfg Config) (*Cortex, error) { } cortex := &Cortex{ - target: cfg.Target, + Cfg: cfg, } - cortex.setupAuthMiddleware(&cfg) + cortex.setupAuthMiddleware() - serviceMap, err := cortex.initModuleServices(&cfg, cfg.Target) + serviceMap, err := cortex.initModuleServices() if err != nil { return nil, err } - cortex.serviceMap = serviceMap - cortex.api.RegisterServiceMapHandler(http.HandlerFunc(cortex.servicesHandler)) + cortex.ServiceMap = serviceMap + cortex.API.RegisterServiceMapHandler(http.HandlerFunc(cortex.servicesHandler)) return cortex, nil } -func (t *Cortex) setupAuthMiddleware(cfg *Config) { - if cfg.AuthEnabled { - cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ +func (t *Cortex) setupAuthMiddleware() { + if t.Cfg.AuthEnabled { + t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ middleware.ServerUserHeaderInterceptor, } - cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ + t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { switch info.FullMethod { // Don't check auth header on TransferChunks, as we weren't originally @@ -266,22 +266,22 @@ func (t *Cortex) setupAuthMiddleware(cfg *Config) { }, } } else { - cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ + t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ fakeGRPCAuthUniaryMiddleware, } - cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ + t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ fakeGRPCAuthStreamMiddleware, } - cfg.API.HTTPAuthMiddleware = fakeHTTPAuthMiddleware + t.Cfg.API.HTTPAuthMiddleware = fakeHTTPAuthMiddleware } } -func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleName]services.Service, error) { +func (t *Cortex) initModuleServices() (map[ModuleName]services.Service, error) { servicesMap := map[ModuleName]services.Service{} // initialize all of our dependencies first - deps := orderedDeps(target) - deps = append(deps, target) // lastly, initialize the requested module + deps := orderedDeps(t.Cfg.Target) + deps = append(deps, t.Cfg.Target) // lastly, initialize the requested module for ix, n := range deps { mod := modules[n] @@ -289,13 +289,13 @@ func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleN var serv services.Service if mod.service != nil { - s, err := mod.service(t, cfg) + s, err := mod.service(t) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) } serv = s } else if mod.wrappedService != nil { - s, err := mod.wrappedService(t, cfg) + s, err := mod.wrappedService(t) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) } @@ -318,7 +318,7 @@ func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleN func (t *Cortex) Run() error { // get all services, create service manager and tell it to start servs := []services.Service(nil) - for _, s := range t.serviceMap { + for _, s := range t.ServiceMap { servs = append(servs, s) } @@ -329,8 +329,8 @@ func (t *Cortex) Run() error { // before starting servers, register /ready handler and gRPC health check service. // It should reflect entire Cortex. - t.server.HTTP.Path("/ready").Handler(t.readyHandler(sm)) - grpc_health_v1.RegisterHealthServer(t.server.GRPC, healthcheck.New(sm)) + t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm)) + grpc_health_v1.RegisterHealthServer(t.Server.GRPC, healthcheck.New(sm)) // Let's listen for events from this manager, and log them. healthy := func() { level.Info(util.Logger).Log("msg", "Cortex started") } @@ -340,7 +340,7 @@ func (t *Cortex) Run() error { sm.StopAsync() // let's find out which module failed - for m, s := range t.serviceMap { + for m, s := range t.ServiceMap { if s == service { if service.FailureCase() == util.ErrStopProcess { level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase()) @@ -361,7 +361,7 @@ func (t *Cortex) Run() error { // It will also be stopped via service manager if any service fails (see attached service listener) // Attach listener before starting services, or we may miss the notification. serverStopping := make(chan struct{}) - t.serviceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) { + t.ServiceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) { close(serverStopping) }, nil, nil)) @@ -411,8 +411,8 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc { // Ingester has a special check that makes sure that it was able to register into the ring, // and that all other ring entries are OK too. - if t.ingester != nil { - if err := t.ingester.CheckReady(r.Context()); err != nil { + if t.Ingester != nil { + if err := t.Ingester.CheckReady(r.Context()); err != nil { http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable) return } diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index dee9bbad728..589d1b44f85 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -52,18 +52,18 @@ func TestCortex(t *testing.T) { c, err := New(cfg) require.NoError(t, err) - require.NotNil(t, c.serviceMap) + require.NotNil(t, c.ServiceMap) - for m, s := range c.serviceMap { + for m, s := range c.ServiceMap { // make sure each service is still New require.Equal(t, services.New, s.State(), "module: %s", m) } // check random modules that we expect to be configured when using Target=All - require.NotNil(t, c.serviceMap[Server]) - require.NotNil(t, c.serviceMap[Ingester]) - require.NotNil(t, c.serviceMap[Ring]) - require.NotNil(t, c.serviceMap[Distributor]) + require.NotNil(t, c.ServiceMap[Server]) + require.NotNil(t, c.ServiceMap[Ingester]) + require.NotNil(t, c.ServiceMap[Ring]) + require.NotNil(t, c.ServiceMap[Distributor]) // check that findInverseDependencie for Ring -- querier and distributor depend on Ring, so should be returned. require.ElementsMatch(t, []ModuleName{Distributor, Querier}, findInverseDependencies(Ring, modules[cfg.Target].deps)) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index e2be5691e0d..f86744c639d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -90,33 +90,33 @@ func (m *ModuleName) UnmarshalYAML(unmarshal func(interface{}) error) error { return m.Set(s) } -func (t *Cortex) initAPI(cfg *Config) (services.Service, error) { - cfg.API.ServerPrefix = cfg.Server.PathPrefix - cfg.API.LegacyHTTPPrefix = cfg.HTTPPrefix +func (t *Cortex) initAPI() (services.Service, error) { + t.Cfg.API.ServerPrefix = t.Cfg.Server.PathPrefix + t.Cfg.API.LegacyHTTPPrefix = t.Cfg.HTTPPrefix - a, err := api.New(cfg.API, t.server, util.Logger) + a, err := api.New(t.Cfg.API, t.Server, util.Logger) if err != nil { return nil, err } - t.api = a + t.API = a - t.api.RegisterAPI(cfg) + t.API.RegisterAPI(t.Cfg) return nil, nil } -func (t *Cortex) initServer(cfg *Config) (services.Service, error) { - serv, err := server.New(cfg.Server) +func (t *Cortex) initServer() (services.Service, error) { + serv, err := server.New(t.Cfg.Server) if err != nil { return nil, err } - t.server = serv + t.Server = serv servicesToWaitFor := func() []services.Service { svs := []services.Service(nil) - for m, s := range t.serviceMap { + for m, s := range t.ServiceMap { // Server should not wait for itself. if m != Server { svs = append(svs, s) @@ -125,84 +125,84 @@ func (t *Cortex) initServer(cfg *Config) (services.Service, error) { return svs } - s := NewServerService(t.server, servicesToWaitFor) + s := NewServerService(t.Server, servicesToWaitFor) return s, nil } -func (t *Cortex) initRing(cfg *Config) (serv services.Service, err error) { - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey) +func (t *Cortex) initRing() (serv services.Service, err error) { + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey) if err != nil { return nil, err } - prometheus.MustRegister(t.ring) + prometheus.MustRegister(t.Ring) - t.api.RegisterRing(t.ring) + t.API.RegisterRing(t.Ring) - return t.ring, nil + return t.Ring, nil } -func (t *Cortex) initRuntimeConfig(cfg *Config) (services.Service, error) { - if cfg.RuntimeConfig.LoadPath == "" { - cfg.RuntimeConfig.LoadPath = cfg.LimitsConfig.PerTenantOverrideConfig - cfg.RuntimeConfig.ReloadPeriod = cfg.LimitsConfig.PerTenantOverridePeriod +func (t *Cortex) initRuntimeConfig() (services.Service, error) { + if t.Cfg.RuntimeConfig.LoadPath == "" { + t.Cfg.RuntimeConfig.LoadPath = t.Cfg.LimitsConfig.PerTenantOverrideConfig + t.Cfg.RuntimeConfig.ReloadPeriod = t.Cfg.LimitsConfig.PerTenantOverridePeriod } - cfg.RuntimeConfig.Loader = loadRuntimeConfig + t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig // make sure to set default limits before we start loading configuration into memory - validation.SetDefaultLimitsForYAMLUnmarshalling(cfg.LimitsConfig) + validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig) - serv, err := runtimeconfig.NewRuntimeConfigManager(cfg.RuntimeConfig, prometheus.DefaultRegisterer) - t.runtimeConfig = serv + serv, err := runtimeconfig.NewRuntimeConfigManager(t.Cfg.RuntimeConfig, prometheus.DefaultRegisterer) + t.RuntimeConfig = serv return serv, err } -func (t *Cortex) initOverrides(cfg *Config) (serv services.Service, err error) { - t.overrides, err = validation.NewOverrides(cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig)) +func (t *Cortex) initOverrides() (serv services.Service, err error) { + t.Overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.RuntimeConfig)) // overrides don't have operational state, nor do they need to do anything more in starting/stopping phase, // so there is no need to return any service. return nil, err } -func (t *Cortex) initDistributor(cfg *Config) (serv services.Service, err error) { - cfg.Distributor.DistributorRing.ListenPort = cfg.Server.GRPCListenPort - cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV +func (t *Cortex) initDistributor() (serv services.Service, err error) { + t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or // ruler's dependency) - canJoinDistributorsRing := (cfg.Target == All || cfg.Target == Distributor) + canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor) - t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.overrides, t.ring, canJoinDistributorsRing) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing) if err != nil { return } - t.api.RegisterDistributor(t.distributor, cfg.Distributor) + t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor) - return t.distributor, nil + return t.Distributor, nil } -func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { - queryable, engine := querier.New(cfg.Querier, t.distributor, t.storeQueryable, t.tombstonesLoader, prometheus.DefaultRegisterer) +func (t *Cortex) initQuerier() (serv services.Service, err error) { + queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryable, t.TombstonesLoader, prometheus.DefaultRegisterer) // if we are not configured for single binary mode then the querier needs to register its paths externally - registerExternally := cfg.Target != All - handler := t.api.RegisterQuerier(queryable, engine, t.distributor, registerExternally, t.tombstonesLoader) + registerExternally := t.Cfg.Target != All + handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally, t.TombstonesLoader) // single binary mode requires a properly configured worker. if the operator did not attempt to configure the // worker we will attempt an automatic configuration here - if cfg.Worker.Address == "" && cfg.Target == All { - address := fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) + if t.Cfg.Worker.Address == "" && t.Cfg.Target == All { + address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort) level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address) - cfg.Worker.Address = address + t.Cfg.Worker.Address = address } // Query frontend worker will only be started after all its dependencies are started, not here. // Worker may also be nil, if not configured, which is OK. - worker, err := frontend.NewWorker(cfg.Worker, cfg.Querier, httpgrpc_server.NewServer(handler), util.Logger) + worker, err := frontend.NewWorker(t.Cfg.Worker, t.Cfg.Querier, httpgrpc_server.NewServer(handler), util.Logger) if err != nil { return } @@ -210,319 +210,318 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { return worker, nil } -func (t *Cortex) initStoreQueryable(cfg *Config) (services.Service, error) { - if cfg.Storage.Engine == storage.StorageEngineChunks { - t.storeQueryable = querier.NewChunkStoreQueryable(cfg.Querier, t.store) +func (t *Cortex) initStoreQueryable() (services.Service, error) { + if t.Cfg.Storage.Engine == storage.StorageEngineChunks { + t.StoreQueryable = querier.NewChunkStoreQueryable(t.Cfg.Querier, t.Store) return nil, nil } - if cfg.Storage.Engine == storage.StorageEngineTSDB && !cfg.TSDB.StoreGatewayEnabled { - storeQueryable, err := querier.NewBlockQueryable(cfg.TSDB, cfg.Server.LogLevel, prometheus.DefaultRegisterer) + if t.Cfg.Storage.Engine == storage.StorageEngineTSDB && !t.Cfg.TSDB.StoreGatewayEnabled { + storeQueryable, err := querier.NewBlockQueryable(t.Cfg.TSDB, t.Cfg.Server.LogLevel, prometheus.DefaultRegisterer) if err != nil { return nil, err } - t.storeQueryable = storeQueryable + t.StoreQueryable = storeQueryable return storeQueryable, nil } - if cfg.Storage.Engine == storage.StorageEngineTSDB && cfg.TSDB.StoreGatewayEnabled { + if t.Cfg.Storage.Engine == storage.StorageEngineTSDB && t.Cfg.TSDB.StoreGatewayEnabled { // When running in single binary, if the blocks sharding is disabled and no custom // store-gateway address has been configured, we can set it to the running process. - if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { - cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) + if t.Cfg.Target == All && !t.Cfg.StoreGateway.ShardingEnabled && t.Cfg.Querier.StoreGatewayAddresses == "" { + t.Cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort) } - storeQueryable, err := querier.NewBlocksStoreQueryableFromConfig(cfg.Querier, cfg.StoreGateway, cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) + storeQueryable, err := querier.NewBlocksStoreQueryableFromConfig(t.Cfg.Querier, t.Cfg.StoreGateway, t.Cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } - t.storeQueryable = storeQueryable + t.StoreQueryable = storeQueryable return storeQueryable, nil } - return nil, fmt.Errorf("unknown storage engine '%s'", cfg.Storage.Engine) + return nil, fmt.Errorf("unknown storage engine '%s'", t.Cfg.Storage.Engine) } -func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) { - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - cfg.Ingester.LifecyclerConfig.ListenPort = cfg.Server.GRPCListenPort - cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB - cfg.Ingester.TSDBConfig = cfg.TSDB - cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels +func (t *Cortex) initIngester() (serv services.Service, err error) { + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Ingester.TSDBEnabled = t.Cfg.Storage.Engine == storage.StorageEngineTSDB + t.Cfg.Ingester.TSDBConfig = t.Cfg.TSDB + t.Cfg.Ingester.ShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels - t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer) if err != nil { return } - t.api.RegisterIngester(t.ingester, cfg.Distributor) + t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor) - return t.ingester, nil + return t.Ingester, nil } -func (t *Cortex) initFlusher(cfg *Config) (serv services.Service, err error) { - t.flusher, err = flusher.New( - cfg.Flusher, - cfg.Ingester, - cfg.IngesterClient, - t.store, +func (t *Cortex) initFlusher() (serv services.Service, err error) { + t.Flusher, err = flusher.New( + t.Cfg.Flusher, + t.Cfg.Ingester, + t.Cfg.IngesterClient, + t.Store, prometheus.DefaultRegisterer, ) if err != nil { return } - return t.flusher, nil + return t.Flusher, nil } -func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) { - if cfg.Storage.Engine == storage.StorageEngineTSDB { +func (t *Cortex) initStore() (serv services.Service, err error) { + if t.Cfg.Storage.Engine == storage.StorageEngineTSDB { return nil, nil } - - err = cfg.Schema.Load() + err = t.Cfg.Schema.Load() if err != nil { return } - t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides, prometheus.DefaultRegisterer, t.tombstonesLoader) + t.Store, err = storage.NewStore(t.Cfg.Storage, t.Cfg.ChunkStore, t.Cfg.Schema, t.Overrides, prometheus.DefaultRegisterer, t.TombstonesLoader) if err != nil { return } return services.NewIdleService(nil, func(_ error) error { - t.store.Stop() + t.Store.Stop() return nil }), nil } -func (t *Cortex) initDeleteRequestsStore(cfg *Config) (serv services.Service, err error) { - if !cfg.DataPurgerConfig.Enable { +func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { + if !t.Cfg.DataPurgerConfig.Enable { // until we need to explicitly enable delete series support we need to do create TombstonesLoader without DeleteStore which acts as noop - t.tombstonesLoader = purger.NewTombstonesLoader(nil, nil) + t.TombstonesLoader = purger.NewTombstonesLoader(nil, nil) return } var indexClient chunk.IndexClient - indexClient, err = storage.NewIndexClient(cfg.Storage.DeleteStoreConfig.Store, cfg.Storage, cfg.Schema) + indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema) if err != nil { return } - t.deletesStore, err = purger.NewDeleteStore(cfg.Storage.DeleteStoreConfig, indexClient) + t.DeletesStore, err = purger.NewDeleteStore(t.Cfg.Storage.DeleteStoreConfig, indexClient) if err != nil { return } - t.tombstonesLoader = purger.NewTombstonesLoader(t.deletesStore, prometheus.DefaultRegisterer) + t.TombstonesLoader = purger.NewTombstonesLoader(t.DeletesStore, prometheus.DefaultRegisterer) return } -func (t *Cortex) initQueryFrontend(cfg *Config) (serv services.Service, err error) { +func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { // Load the schema only if sharded queries is set. - if cfg.QueryRange.ShardedQueries { - err = cfg.Schema.Load() + if t.Cfg.QueryRange.ShardedQueries { + err = t.Cfg.Schema.Load() if err != nil { return } } - t.frontend, err = frontend.New(cfg.Frontend, util.Logger, prometheus.DefaultRegisterer) + t.Frontend, err = frontend.New(t.Cfg.Frontend, util.Logger, prometheus.DefaultRegisterer) if err != nil { return } tripperware, cache, err := queryrange.NewTripperware( - cfg.QueryRange, + t.Cfg.QueryRange, util.Logger, - t.overrides, + t.Overrides, queryrange.PrometheusCodec, queryrange.PrometheusResponseExtractor{}, - cfg.Schema, + t.Cfg.Schema, promql.EngineOpts{ Logger: util.Logger, Reg: prometheus.DefaultRegisterer, - MaxSamples: cfg.Querier.MaxSamples, - Timeout: cfg.Querier.Timeout, + MaxSamples: t.Cfg.Querier.MaxSamples, + Timeout: t.Cfg.Querier.Timeout, }, - cfg.Querier.QueryIngestersWithin, + t.Cfg.Querier.QueryIngestersWithin, prometheus.DefaultRegisterer, - t.tombstonesLoader, + t.TombstonesLoader, ) if err != nil { return nil, err } - t.cache = cache - t.frontend.Wrap(tripperware) + t.Cache = cache + t.Frontend.Wrap(tripperware) - t.api.RegisterQueryFrontend(t.frontend) + t.API.RegisterQueryFrontend(t.Frontend) return services.NewIdleService(nil, func(_ error) error { - t.frontend.Close() - if t.cache != nil { - t.cache.Stop() - t.cache = nil + t.Frontend.Close() + if t.Cache != nil { + t.Cache.Stop() + t.Cache = nil } return nil }), nil } -func (t *Cortex) initTableManager(cfg *Config) (services.Service, error) { - if cfg.Storage.Engine == storage.StorageEngineTSDB { +func (t *Cortex) initTableManager() (services.Service, error) { + if t.Cfg.Storage.Engine == storage.StorageEngineTSDB { return nil, nil // table manager isn't used in v2 } - err := cfg.Schema.Load() + err := t.Cfg.Schema.Load() if err != nil { return nil, err } // Assume the newest config is the one to use - lastConfig := &cfg.Schema.Configs[len(cfg.Schema.Configs)-1] - - if (cfg.TableManager.ChunkTables.WriteScale.Enabled || - cfg.TableManager.IndexTables.WriteScale.Enabled || - cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || - cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || - cfg.TableManager.ChunkTables.ReadScale.Enabled || - cfg.TableManager.IndexTables.ReadScale.Enabled || - cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || - cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && - cfg.Storage.AWSStorageConfig.Metrics.URL == "" { + lastConfig := &t.Cfg.Schema.Configs[len(t.Cfg.Schema.Configs)-1] + + if (t.Cfg.TableManager.ChunkTables.WriteScale.Enabled || + t.Cfg.TableManager.IndexTables.WriteScale.Enabled || + t.Cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || + t.Cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || + t.Cfg.TableManager.ChunkTables.ReadScale.Enabled || + t.Cfg.TableManager.IndexTables.ReadScale.Enabled || + t.Cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || + t.Cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && + t.Cfg.Storage.AWSStorageConfig.Metrics.URL == "" { level.Error(util.Logger).Log("msg", "WriteScale is enabled but no Metrics URL has been provided") os.Exit(1) } - tableClient, err := storage.NewTableClient(lastConfig.IndexType, cfg.Storage) + tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage) if err != nil { return nil, err } - bucketClient, err := storage.NewBucketClient(cfg.Storage) + bucketClient, err := storage.NewBucketClient(t.Cfg.Storage) util.CheckFatal("initializing bucket client", err) - t.tableManager, err = chunk.NewTableManager(cfg.TableManager, cfg.Schema, cfg.Ingester.MaxChunkAge, tableClient, bucketClient, prometheus.DefaultRegisterer) - return t.tableManager, err + t.TableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.Schema, t.Cfg.Ingester.MaxChunkAge, tableClient, bucketClient, prometheus.DefaultRegisterer) + return t.TableManager, err } -func (t *Cortex) initRuler(cfg *Config) (serv services.Service, err error) { - cfg.Ruler.Ring.ListenPort = cfg.Server.GRPCListenPort - cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - queryable, engine := querier.New(cfg.Querier, t.distributor, t.storeQueryable, t.tombstonesLoader, prometheus.DefaultRegisterer) +func (t *Cortex) initRuler() (serv services.Service, err error) { + t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryable, t.TombstonesLoader, prometheus.DefaultRegisterer) - t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor, prometheus.DefaultRegisterer, util.Logger) + t.Ruler, err = ruler.NewRuler(t.Cfg.Ruler, engine, queryable, t.Distributor, prometheus.DefaultRegisterer, util.Logger) if err != nil { return } // Expose HTTP endpoints. - t.api.RegisterRuler(t.ruler, cfg.Ruler.EnableAPI) + t.API.RegisterRuler(t.Ruler, t.Cfg.Ruler.EnableAPI) - return t.ruler, nil + return t.Ruler, nil } -func (t *Cortex) initConfig(cfg *Config) (serv services.Service, err error) { - t.configDB, err = db.New(cfg.Configs.DB) +func (t *Cortex) initConfig() (serv services.Service, err error) { + t.ConfigDB, err = db.New(t.Cfg.Configs.DB) if err != nil { return } - t.configAPI = configAPI.New(t.configDB, cfg.Configs.API) - t.configAPI.RegisterRoutes(t.server.HTTP) + t.ConfigAPI = configAPI.New(t.ConfigDB, t.Cfg.Configs.API) + t.ConfigAPI.RegisterRoutes(t.Server.HTTP) return services.NewIdleService(nil, func(_ error) error { - t.configDB.Close() + t.ConfigDB.Close() return nil }), nil } -func (t *Cortex) initAlertManager(cfg *Config) (serv services.Service, err error) { - t.alertmanager, err = alertmanager.NewMultitenantAlertmanager(&cfg.Alertmanager, util.Logger, prometheus.DefaultRegisterer) +func (t *Cortex) initAlertManager() (serv services.Service, err error) { + t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, util.Logger, prometheus.DefaultRegisterer) if err != nil { return } - t.api.RegisterAlertmanager(t.alertmanager, cfg.Target == AlertManager) - return t.alertmanager, nil + t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Target == AlertManager) + return t.Alertmanager, nil } -func (t *Cortex) initCompactor(cfg *Config) (serv services.Service, err error) { - cfg.Compactor.ShardingRing.ListenPort = cfg.Server.GRPCListenPort - cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV +func (t *Cortex) initCompactor() (serv services.Service, err error) { + t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.compactor, err = compactor.NewCompactor(cfg.Compactor, cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) + t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) if err != nil { return } // Expose HTTP endpoints. - t.api.RegisterCompactor(t.compactor) - return t.compactor, nil + t.API.RegisterCompactor(t.Compactor) + return t.Compactor, nil } -func (t *Cortex) initStoreGateway(cfg *Config) (serv services.Service, err error) { - if cfg.Storage.Engine != storage.StorageEngineTSDB { +func (t *Cortex) initStoreGateway() (serv services.Service, err error) { + if t.Cfg.Storage.Engine != storage.StorageEngineTSDB { return nil, nil } - cfg.StoreGateway.ShardingRing.ListenPort = cfg.Server.GRPCListenPort - cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV + t.Cfg.StoreGateway.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.storeGateway, err = storegateway.NewStoreGateway(cfg.StoreGateway, cfg.TSDB, cfg.Server.LogLevel, util.Logger, prometheus.DefaultRegisterer) + t.StoreGateway, err = storegateway.NewStoreGateway(t.Cfg.StoreGateway, t.Cfg.TSDB, t.Cfg.Server.LogLevel, util.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } // Expose HTTP endpoints. - t.api.RegisterStoreGateway(t.storeGateway) + t.API.RegisterStoreGateway(t.StoreGateway) - return t.storeGateway, nil + return t.StoreGateway, nil } -func (t *Cortex) initMemberlistKV(cfg *Config) (services.Service, error) { - cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer - cfg.MemberlistKV.Codecs = []codec.Codec{ +func (t *Cortex) initMemberlistKV() (services.Service, error) { + t.Cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer + t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), } - t.memberlistKV = memberlist.NewKVInit(&cfg.MemberlistKV) + t.MemberlistKV = memberlist.NewKVInit(&t.Cfg.MemberlistKV) return services.NewIdleService(nil, func(_ error) error { - t.memberlistKV.Stop() + t.MemberlistKV.Stop() return nil }), nil } -func (t *Cortex) initDataPurger(cfg *Config) (services.Service, error) { - if !cfg.DataPurgerConfig.Enable { +func (t *Cortex) initDataPurger() (services.Service, error) { + if !t.Cfg.DataPurgerConfig.Enable { return nil, nil } - storageClient, err := storage.NewObjectClient(cfg.DataPurgerConfig.ObjectStoreType, cfg.Storage) + storageClient, err := storage.NewObjectClient(t.Cfg.DataPurgerConfig.ObjectStoreType, t.Cfg.Storage) if err != nil { return nil, err } - t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient, prometheus.DefaultRegisterer) + t.DataPurger, err = purger.NewDataPurger(t.Cfg.DataPurgerConfig, t.DeletesStore, t.Store, storageClient, prometheus.DefaultRegisterer) if err != nil { return nil, err } - t.api.RegisterPurger(t.deletesStore) + t.API.RegisterPurger(t.DeletesStore) - return t.dataPurger, nil + return t.DataPurger, nil } type module struct { deps []ModuleName // service for this module (can return nil) - service func(t *Cortex, cfg *Config) (services.Service, error) + service func(t *Cortex) (services.Service, error) // service that will be wrapped into moduleServiceWrapper, to wait for dependencies to start / end // (can return nil) - wrappedService func(t *Cortex, cfg *Config) (services.Service, error) + wrappedService func(t *Cortex) (services.Service, error) } var modules = map[ModuleName]module{ diff --git a/pkg/cortex/status.go b/pkg/cortex/status.go index dec9a0f1e9b..b7bfe9617ba 100644 --- a/pkg/cortex/status.go +++ b/pkg/cortex/status.go @@ -10,7 +10,7 @@ func (t *Cortex) servicesHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") // TODO: this could be extended to also print sub-services, if given service has any - for mod, s := range t.serviceMap { + for mod, s := range t.ServiceMap { if s != nil { fmt.Fprintf(w, "%v => %v\n", mod, s.State()) }