From 6c2e3c3bbaa997e1599a3dfce45a84ae0dd90caf Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Sat, 3 Oct 2020 13:06:20 +0200 Subject: [PATCH 01/13] Add an ability to load an arbitrary list of Cortex modules Signed-off-by: Igor Novgorodov --- CHANGELOG.md | 3 ++ docs/configuration/config-file-reference.md | 6 ++- pkg/cortex/cortex.go | 51 +++++++++++++++------ pkg/cortex/cortex_test.go | 20 ++++---- pkg/cortex/modules.go | 13 +++--- pkg/cortex/server_service.go | 1 + pkg/util/modules/modules.go | 48 ++++++++++++++----- pkg/util/modules/modules_test.go | 30 +++++++++--- 8 files changed, 123 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee66431a17a..3360d6b66b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## master / unreleased +* [FEATURE] Implement an ability to load multiple Cortex modules using a comma-separated list. #3272 + - Should be backwards compatible with the current behavior. + - Target list like 'all,compactor' can be used to load the single-binary modules plus some other modules. * [CHANGE] Blocks storage: update the default HTTP configuration values for the S3 client to the upstream Thanos default values. #3244 - `-blocks-storage.s3.http.idle-conn-timeout` is set 90 seconds. - `-blocks-storage.s3.http.response-header-timeout` is set to 2 minutes. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 703c345d9e5..b44f461d5b2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -52,8 +52,10 @@ Where default_value is the value to use if the environment variable is undefined ### Supported contents and default values of the config file ```yaml -# The Cortex module to run. Use "-modules" command line flag to get a list of -# available modules, and to see which modules are included in "All". +# List of Cortex modules to load, comma separated. The alias 'all' can be used +# in the list to load a number of core modules and will enable single-binary +# mode. Use '-modules' command line flag to get a list of available modules, and +# to see which modules are included in 'all'. # CLI flag: -target [target: | default = "all"] diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 0704f39f8eb..997429c8aa3 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "reflect" + "strings" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -71,10 +72,11 @@ import ( // Config is the root config for Cortex. type Config struct { - Target string `yaml:"target"` - AuthEnabled bool `yaml:"auth_enabled"` - PrintConfig bool `yaml:"-"` - HTTPPrefix string `yaml:"http_prefix"` + Target string `yaml:"target"` + Modules map[string]bool `yaml:"-"` + AuthEnabled bool `yaml:"auth_enabled"` + PrintConfig bool `yaml:"-"` + HTTPPrefix string `yaml:"http_prefix"` API api.Config `yaml:"api"` Server server.Config `yaml:"server"` @@ -109,7 +111,11 @@ type Config struct { func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Server.MetricsNamespace = "cortex" c.Server.ExcludeRequestInLog = true - f.StringVar(&c.Target, "target", All, "The Cortex module to run. Use \"-modules\" command line flag to get a list of available modules, and to see which modules are included in \"All\".") + + f.StringVar(&c.Target, "target", All, "List of Cortex modules to load, comma separated. "+ + "The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+ + "Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.") + f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.") f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.") @@ -277,6 +283,12 @@ func New(cfg Config) (*Cortex, error) { os.Exit(0) } + // Parse a comma-separated list of modules to load + cfg.Modules = map[string]bool{} + for _, n := range strings.Split(cfg.Target, ",") { + cfg.Modules[n] = true + } + // Don't check auth header on TransferChunks, as we weren't originally // sending it and this could cause transfers to fail on update. // @@ -305,20 +317,31 @@ func (t *Cortex) setupThanosTracing() { t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, ThanosTracerStreamInterceptor) } -// Run starts Cortex running, and blocks until a Cortex stops. -func (t *Cortex) Run() error { - if !t.ModuleManager.IsUserVisibleModule(t.Cfg.Target) { - level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", t.Cfg.Target) - } +// InitModules initializes required Cortex modules +func (t *Cortex) InitModules() error { + for module := range t.Cfg.Modules { + if !t.ModuleManager.IsUserVisibleModule(module) { + level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module) + } - serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target) - if err != nil { - return err + err := t.ModuleManager.InitModuleServices(module) + if err != nil { + return err + } } - t.ServiceMap = serviceMap + t.ServiceMap = t.ModuleManager.GetServicesMap() t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler)) + return nil +} + +// Run starts Cortex running, and blocks until a Cortex stops. +func (t *Cortex) Run() error { + if err := t.InitModules(); err != nil { + return err + } + // get all services, create service manager and tell it to start servs := []services.Service(nil) for _, s := range t.ServiceMap { diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 1c9070a996b..df67901df8b 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -1,6 +1,7 @@ package cortex import ( + "fmt" "net/url" "testing" @@ -68,24 +69,27 @@ func TestCortex(t *testing.T) { }, }, }, - Target: All, + Target: fmt.Sprintf("%s,%s", All, Compactor), } c, err := New(cfg) require.NoError(t, err) - serviceMap, err := c.ModuleManager.InitModuleServices(c.Cfg.Target) + err = c.InitModules() require.NoError(t, err) - require.NotNil(t, serviceMap) + require.NotNil(t, c.ServiceMap) - for m, s := range serviceMap { + for m, s := range c.ServiceMap { // make sure each service is still New require.Equal(t, services.New, s.State(), "module: %s", m) } // check random modules that we expect to be configured when using Target=All - require.NotNil(t, serviceMap[Server]) - require.NotNil(t, serviceMap[IngesterService]) - require.NotNil(t, serviceMap[Ring]) - require.NotNil(t, serviceMap[DistributorService]) + require.NotNil(t, c.ServiceMap[Server]) + require.NotNil(t, c.ServiceMap[IngesterService]) + require.NotNil(t, c.ServiceMap[Ring]) + require.NotNil(t, c.ServiceMap[DistributorService]) + + // check that compactor is configured which is not part of Target=All + require.NotNil(t, c.ServiceMap[Compactor]) } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index b6e5ffb20c5..581551481b3 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -135,7 +135,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { t.Cfg.LimitsConfig.RulerEvaluationDelay = t.Cfg.Ruler.EvaluationDelay // No need to report if this field isn't going to be used. - if t.Cfg.Target == All || t.Cfg.Target == Ruler { + if t.ModuleManager.IsModuleRegistered(Ruler) { flagext.DeprecatedFlagsUsed.Inc() level.Warn(util.Logger).Log("msg", "Using DEPRECATED YAML config field ruler.evaluation_delay_duration, please use limits.ruler_evaluation_delay_duration instead.") } @@ -173,7 +173,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or // ruler's dependency) - canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor) + canJoinDistributorsRing := t.ModuleManager.IsModuleRegistered(Distributor) t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer) if err != nil { @@ -222,12 +222,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { }, []string{"method", "route"}) // if we are not configured for single binary mode then the querier needs to register its paths externally - registerExternally := t.Cfg.Target != All + registerExternally := !t.Cfg.Modules[All] handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally, t.TombstonesLoader, querierRequestDuration, receivedMessageSize, sentMessageSize, inflightRequests) // single binary mode requires a properly configured worker. if the operator did not attempt to configure the // worker we will attempt an automatic configuration here - if t.Cfg.Worker.Address == "" && t.Cfg.Target == All { + if t.Cfg.Worker.Address == "" && t.ModuleManager.IsModuleRegistered(All) { address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort) level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address) t.Cfg.Worker.Address = address @@ -298,7 +298,7 @@ func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, l case storage.StorageEngineBlocks: // When running in single binary, if the blocks sharding is disabled and no custom // store-gateway address has been configured, we can set it to the running process. - if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { + if cfg.Modules[All] && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) } @@ -510,13 +510,12 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) { // unfortunately there is no way to generate a "default" config and compare default against actual // to determine if it's unconfigured. the following check, however, correctly tests this. // Single binary integration tests will break if this ever drifts - if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() { + if t.Cfg.Modules[All] && t.Cfg.Ruler.StoreConfig.IsDefaults() { level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.") return } t.RulerStorage, err = ruler.NewRuleStorage(t.Cfg.Ruler.StoreConfig, rules.FileLoader{}) - return } diff --git a/pkg/cortex/server_service.go b/pkg/cortex/server_service.go index c5a84942e81..7a565fbfb01 100644 --- a/pkg/cortex/server_service.go +++ b/pkg/cortex/server_service.go @@ -54,6 +54,7 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S return services.NewBasicService(nil, runFn, stoppingFn) } +// DisableSignalHandling puts a dummy signal handler func DisableSignalHandling(config *server.Config) { config.SignalHandler = make(ignoreSignalHandler) } diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index 1af768016bf..fb3082fd684 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -25,6 +25,12 @@ type module struct { // in the right order of dependencies. type Manager struct { modules map[string]*module + + // Modules that are already initialized + modulesLoaded map[string]bool + + // Service map + servicesMap map[string]services.Service } // UserInvisibleModule is an option for `RegisterModule` that marks module not visible to user. Modules are user visible by default. @@ -35,7 +41,9 @@ func UserInvisibleModule(m *module) { // NewManager creates a new Manager func NewManager() *Manager { return &Manager{ - modules: make(map[string]*module), + modules: make(map[string]*module), + modulesLoaded: make(map[string]bool), + servicesMap: make(map[string]services.Service), } } @@ -67,18 +75,21 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { // InitModuleServices initialises the target module by initialising all its dependencies // in the right order. Modules are wrapped in such a way that they start after their // dependencies have been started and stop before their dependencies are stopped. -func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) { - if _, ok := m.modules[target]; !ok { - return nil, fmt.Errorf("unrecognised module name: %s", target) +func (m *Manager) InitModuleServices(name string) error { + if !m.IsModuleRegistered(name) { + return fmt.Errorf("unrecognised module name: %s", name) } - servicesMap := map[string]services.Service{} - // initialize all of our dependencies first - deps := m.orderedDeps(target) - deps = append(deps, target) // lastly, initialize the requested module + deps := m.orderedDeps(name) + deps = append(deps, name) // lastly, initialize the requested module for ix, n := range deps { + // Skip already loaded modules + if m.modulesLoaded[n] { + continue + } + mod := m.modules[n] var serv services.Service @@ -86,22 +97,24 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service if mod.initFn != nil { s, err := mod.initFn() if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) + return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) } if s != nil { // We pass servicesMap, which isn't yet complete. By the time service starts, // it will be fully built, so there is no need for extra synchronization. - serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) + serv = newModuleServiceWrapper(m.servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) } } if serv != nil { - servicesMap[n] = serv + m.servicesMap[n] = serv } + + m.modulesLoaded[n] = true } - return servicesMap, nil + return nil } // UserVisibleModuleNames gets list of module names that are @@ -119,6 +132,11 @@ func (m *Manager) UserVisibleModuleNames() []string { return result } +// GetServicesMap returns services map +func (m *Manager) GetServicesMap() map[string]services.Service { + return m.servicesMap +} + // IsUserVisibleModule check if given module is public or not. Returns true // if and only if the given module is registered and is public. func (m *Manager) IsUserVisibleModule(mod string) bool { @@ -131,6 +149,12 @@ func (m *Manager) IsUserVisibleModule(mod string) bool { return false } +// IsModuleRegistered returns true if given module was registered +func (m *Manager) IsModuleRegistered(mod string) bool { + _, ok := m.modules[mod] + return ok +} + // listDeps recursively gets a list of dependencies for a passed moduleName func (m *Manager) listDeps(mod string) []string { deps := m.modules[mod].deps diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index 8734c3c9b05..403a8b1057c 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -1,6 +1,7 @@ package modules import ( + "errors" "fmt" "testing" @@ -10,7 +11,9 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" ) -func mockInitFunc() (services.Service, error) { return nil, nil } +func mockInitFunc() (services.Service, error) { return services.NewIdleService(nil, nil), nil } + +func mockInitFuncFail() (services.Service, error) { return nil, errors.New("Error") } func TestDependencies(t *testing.T) { var testModules = map[string]module{ @@ -25,6 +28,10 @@ func TestDependencies(t *testing.T) { "serviceC": { initFn: mockInitFunc, }, + + "serviceD": { + initFn: mockInitFuncFail, + }, } mm := NewManager() @@ -39,13 +46,24 @@ func TestDependencies(t *testing.T) { require.Len(t, invDeps, 1) assert.Equal(t, invDeps[0], "serviceB") - svcs, err := mm.InitModuleServices("serviceC") - assert.NotNil(t, svcs) + // Test unknown module + err := mm.InitModuleServices("service_unknown") + assert.Error(t, err, fmt.Errorf("unrecognised module name: service_unknown")) + assert.Empty(t, mm.GetServicesMap()) + + // Test init failure + err = mm.InitModuleServices("serviceD") + assert.Error(t, err) + assert.Empty(t, mm.GetServicesMap()) + + err = mm.InitModuleServices("serviceC") assert.NoError(t, err) + assert.Equal(t, 3, len(mm.GetServicesMap())) - svcs, err = mm.InitModuleServices("service_unknown") - assert.Nil(t, svcs) - assert.Error(t, err, fmt.Errorf("unrecognised module name: service_unknown")) + // Test loading of the module second time (should be noop) + err = mm.InitModuleServices("serviceC") + assert.NoError(t, err) + assert.Equal(t, 3, len(mm.GetServicesMap())) } func TestRegisterModuleDefaultsToUserVisible(t *testing.T) { From e3b67b2504b419ab0eccfab35f481b2c792f56bb Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Sat, 3 Oct 2020 19:56:50 +0200 Subject: [PATCH 02/13] Fix module loading logic Signed-off-by: Igor Novgorodov --- pkg/cortex/modules.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 581551481b3..cf1dbb37a63 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -135,7 +135,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { t.Cfg.LimitsConfig.RulerEvaluationDelay = t.Cfg.Ruler.EvaluationDelay // No need to report if this field isn't going to be used. - if t.ModuleManager.IsModuleRegistered(Ruler) { + if t.Cfg.Modules[Ruler] || t.Cfg.Modules[All] { flagext.DeprecatedFlagsUsed.Inc() level.Warn(util.Logger).Log("msg", "Using DEPRECATED YAML config field ruler.evaluation_delay_duration, please use limits.ruler_evaluation_delay_duration instead.") } @@ -173,7 +173,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or // ruler's dependency) - canJoinDistributorsRing := t.ModuleManager.IsModuleRegistered(Distributor) + canJoinDistributorsRing := t.Cfg.Modules[Distributor] || t.Cfg.Modules[All] t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer) if err != nil { @@ -227,7 +227,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { // single binary mode requires a properly configured worker. if the operator did not attempt to configure the // worker we will attempt an automatic configuration here - if t.Cfg.Worker.Address == "" && t.ModuleManager.IsModuleRegistered(All) { + if t.Cfg.Worker.Address == "" && t.Cfg.Modules[All] { address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort) level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address) t.Cfg.Worker.Address = address From 3fb005c92730e84cfe28562cbb7d3a14419dd8a0 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Wed, 7 Oct 2020 16:57:10 +0200 Subject: [PATCH 03/13] Improve multiple module loading Signed-off-by: Igor Novgorodov --- CHANGELOG.md | 4 +--- cmd/cortex/main.go | 7 +++++- pkg/cortex/cortex.go | 41 ++++++++++++++------------------ pkg/cortex/cortex_test.go | 5 ++-- pkg/cortex/modules.go | 14 +++++------ pkg/util/modules/modules.go | 39 +++++++++++++++--------------- pkg/util/modules/modules_test.go | 16 ++++++------- tools/doc-generator/parser.go | 2 ++ 8 files changed, 64 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3360d6b66b2..f1f9453954f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,6 @@ ## master / unreleased -* [FEATURE] Implement an ability to load multiple Cortex modules using a comma-separated list. #3272 - - Should be backwards compatible with the current behavior. - - Target list like 'all,compactor' can be used to load the single-binary modules plus some other modules. * [CHANGE] Blocks storage: update the default HTTP configuration values for the S3 client to the upstream Thanos default values. #3244 - `-blocks-storage.s3.http.idle-conn-timeout` is set 90 seconds. - `-blocks-storage.s3.http.response-header-timeout` is set to 2 minutes. @@ -45,6 +42,7 @@ - `POST /ingester/push` * [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257 * [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217 +* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3272 * [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244 - `-blocks-storage.s3.http.idle-conn-timeout` - `-blocks-storage.s3.http.response-header-timeout` diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index ec72a041207..8955696326b 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -146,8 +146,13 @@ func main() { // In testing mode skip JAEGER setup to avoid panic due to // "duplicate metrics collector registration attempted" if !testMode { + name := "cortex" + if len(cfg.Target) == 1 { + name += "-" + cfg.Target[0] + } + // Setting the environment variable JAEGER_AGENT_HOST enables tracing. - if trace, err := tracing.NewFromEnv("cortex-" + cfg.Target); err != nil { + if trace, err := tracing.NewFromEnv(name); err != nil { level.Error(util.Logger).Log("msg", "Failed to setup tracing", "err", err.Error()) } else { defer trace.Close() diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 997429c8aa3..13a8ae02b8b 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "reflect" - "strings" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -72,11 +71,10 @@ import ( // Config is the root config for Cortex. type Config struct { - Target string `yaml:"target"` - Modules map[string]bool `yaml:"-"` - AuthEnabled bool `yaml:"auth_enabled"` - PrintConfig bool `yaml:"-"` - HTTPPrefix string `yaml:"http_prefix"` + Target flagext.StringSliceCSV `yaml:"target"` + AuthEnabled bool `yaml:"auth_enabled"` + PrintConfig bool `yaml:"-"` + HTTPPrefix string `yaml:"http_prefix"` API api.Config `yaml:"api"` Server server.Config `yaml:"server"` @@ -112,7 +110,9 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Server.MetricsNamespace = "cortex" c.Server.ExcludeRequestInLog = true - f.StringVar(&c.Target, "target", All, "List of Cortex modules to load, comma separated. "+ + c.Target.Set(All) + + f.Var((*flagext.StringSliceCSV)(&c.Target), "target", "List of Cortex modules to load, comma separated. "+ "The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+ "Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.") @@ -209,6 +209,10 @@ func (c *Config) Validate(log log.Logger) error { return nil } +func (c *Config) isModuleEnabled(m string) bool { + return util.StringsContain(c.Target, m) +} + // validateYAMLEmptyNodes ensure that no empty node has been specified in the YAML config file. // When an empty node is defined in YAML, the YAML parser sets the whole struct to its zero value // and so we loose all default values. It's very difficult to detect this case for the user, so we @@ -283,12 +287,6 @@ func New(cfg Config) (*Cortex, error) { os.Exit(0) } - // Parse a comma-separated list of modules to load - cfg.Modules = map[string]bool{} - for _, n := range strings.Split(cfg.Target, ",") { - cfg.Modules[n] = true - } - // Don't check auth header on TransferChunks, as we weren't originally // sending it and this could cause transfers to fail on update. // @@ -317,28 +315,25 @@ func (t *Cortex) setupThanosTracing() { t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, ThanosTracerStreamInterceptor) } -// InitModules initializes required Cortex modules -func (t *Cortex) InitModules() error { - for module := range t.Cfg.Modules { +func (t *Cortex) initModules() (err error) { + for _, module := range t.Cfg.Target { if !t.ModuleManager.IsUserVisibleModule(module) { level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module) } + } - err := t.ModuleManager.InitModuleServices(module) - if err != nil { - return err - } + t.ServiceMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...) + if err != nil { + return err } - t.ServiceMap = t.ModuleManager.GetServicesMap() t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler)) - return nil } // Run starts Cortex running, and blocks until a Cortex stops. func (t *Cortex) Run() error { - if err := t.InitModules(); err != nil { + if err := t.initModules(); err != nil { return err } diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index df67901df8b..e795682e883 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -69,13 +69,14 @@ func TestCortex(t *testing.T) { }, }, }, - Target: fmt.Sprintf("%s,%s", All, Compactor), } + cfg.Target.Set(fmt.Sprintf("%s,%s", All, Compactor)) + c, err := New(cfg) require.NoError(t, err) - err = c.InitModules() + err = c.initModules() require.NoError(t, err) require.NotNil(t, c.ServiceMap) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index cf1dbb37a63..226a7cc8985 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -135,7 +135,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { t.Cfg.LimitsConfig.RulerEvaluationDelay = t.Cfg.Ruler.EvaluationDelay // No need to report if this field isn't going to be used. - if t.Cfg.Modules[Ruler] || t.Cfg.Modules[All] { + if t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(All) { flagext.DeprecatedFlagsUsed.Inc() level.Warn(util.Logger).Log("msg", "Using DEPRECATED YAML config field ruler.evaluation_delay_duration, please use limits.ruler_evaluation_delay_duration instead.") } @@ -173,7 +173,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or // ruler's dependency) - canJoinDistributorsRing := t.Cfg.Modules[Distributor] || t.Cfg.Modules[All] + canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All) t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer) if err != nil { @@ -222,12 +222,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { }, []string{"method", "route"}) // if we are not configured for single binary mode then the querier needs to register its paths externally - registerExternally := !t.Cfg.Modules[All] + registerExternally := !t.Cfg.isModuleEnabled(All) handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally, t.TombstonesLoader, querierRequestDuration, receivedMessageSize, sentMessageSize, inflightRequests) // single binary mode requires a properly configured worker. if the operator did not attempt to configure the // worker we will attempt an automatic configuration here - if t.Cfg.Worker.Address == "" && t.Cfg.Modules[All] { + if t.Cfg.Worker.Address == "" && t.Cfg.isModuleEnabled(All) { address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort) level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address) t.Cfg.Worker.Address = address @@ -298,7 +298,7 @@ func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, l case storage.StorageEngineBlocks: // When running in single binary, if the blocks sharding is disabled and no custom // store-gateway address has been configured, we can set it to the running process. - if cfg.Modules[All] && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { + if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) } @@ -510,7 +510,7 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) { // unfortunately there is no way to generate a "default" config and compare default against actual // to determine if it's unconfigured. the following check, however, correctly tests this. // Single binary integration tests will break if this ever drifts - if t.Cfg.Modules[All] && t.Cfg.Ruler.StoreConfig.IsDefaults() { + if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() { level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.") return } @@ -573,7 +573,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { return } - t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Target == AlertManager, t.Cfg.Alertmanager.EnableAPI) + t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI) return t.Alertmanager, nil } diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index fb3082fd684..3352bab37bb 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -27,7 +27,7 @@ type Manager struct { modules map[string]*module // Modules that are already initialized - modulesLoaded map[string]bool + modulesInitialized map[string]bool // Service map servicesMap map[string]services.Service @@ -41,9 +41,9 @@ func UserInvisibleModule(m *module) { // NewManager creates a new Manager func NewManager() *Manager { return &Manager{ - modules: make(map[string]*module), - modulesLoaded: make(map[string]bool), - servicesMap: make(map[string]services.Service), + modules: make(map[string]*module), + modulesInitialized: make(map[string]bool), + servicesMap: make(map[string]services.Service), } } @@ -72,11 +72,21 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { return nil } -// InitModuleServices initialises the target module by initialising all its dependencies +// InitModuleServices initialises given modules by initialising all their dependencies // in the right order. Modules are wrapped in such a way that they start after their // dependencies have been started and stop before their dependencies are stopped. -func (m *Manager) InitModuleServices(name string) error { - if !m.IsModuleRegistered(name) { +func (m *Manager) InitModuleServices(modules ...string) (map[string]services.Service, error) { + for _, module := range modules { + if err := m.initModule(module); err != nil { + return nil, err + } + } + + return m.servicesMap, nil +} + +func (m *Manager) initModule(name string) error { + if _, ok := m.modules[name]; !ok { return fmt.Errorf("unrecognised module name: %s", name) } @@ -86,7 +96,7 @@ func (m *Manager) InitModuleServices(name string) error { for ix, n := range deps { // Skip already loaded modules - if m.modulesLoaded[n] { + if m.modulesInitialized[n] { continue } @@ -111,7 +121,7 @@ func (m *Manager) InitModuleServices(name string) error { m.servicesMap[n] = serv } - m.modulesLoaded[n] = true + m.modulesInitialized[n] = true } return nil @@ -132,11 +142,6 @@ func (m *Manager) UserVisibleModuleNames() []string { return result } -// GetServicesMap returns services map -func (m *Manager) GetServicesMap() map[string]services.Service { - return m.servicesMap -} - // IsUserVisibleModule check if given module is public or not. Returns true // if and only if the given module is registered and is public. func (m *Manager) IsUserVisibleModule(mod string) bool { @@ -149,12 +154,6 @@ func (m *Manager) IsUserVisibleModule(mod string) bool { return false } -// IsModuleRegistered returns true if given module was registered -func (m *Manager) IsModuleRegistered(mod string) bool { - _, ok := m.modules[mod] - return ok -} - // listDeps recursively gets a list of dependencies for a passed moduleName func (m *Manager) listDeps(mod string) []string { deps := m.modules[mod].deps diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index 403a8b1057c..13b545045ac 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -47,23 +47,23 @@ func TestDependencies(t *testing.T) { assert.Equal(t, invDeps[0], "serviceB") // Test unknown module - err := mm.InitModuleServices("service_unknown") + svc, err := mm.InitModuleServices("service_unknown") assert.Error(t, err, fmt.Errorf("unrecognised module name: service_unknown")) - assert.Empty(t, mm.GetServicesMap()) + assert.Empty(t, svc) // Test init failure - err = mm.InitModuleServices("serviceD") + svc, err = mm.InitModuleServices("serviceD") assert.Error(t, err) - assert.Empty(t, mm.GetServicesMap()) + assert.Empty(t, svc) - err = mm.InitModuleServices("serviceC") + svc, err = mm.InitModuleServices("serviceC") assert.NoError(t, err) - assert.Equal(t, 3, len(mm.GetServicesMap())) + assert.Equal(t, 3, len(svc)) // Test loading of the module second time (should be noop) - err = mm.InitModuleServices("serviceC") + svc, err = mm.InitModuleServices("serviceC") assert.NoError(t, err) - assert.Equal(t, 3, len(mm.GetServicesMap())) + assert.Equal(t, 3, len(svc)) } func TestRegisterModuleDefaultsToUserVisible(t *testing.T) { diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index f0480412d14..2beadb2627e 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -241,6 +241,8 @@ func getFieldType(t reflect.Type) (string, error) { return "duration", nil case "cortex.moduleName": return "string", nil + case "flagext.StringSliceCSV": + return "string", nil } // Fallback to auto-detection of built-in data types From 12289125d069d402879e412eca685221de876ca9 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Wed, 7 Oct 2020 17:05:20 +0200 Subject: [PATCH 04/13] Add forgotten files Signed-off-by: Igor Novgorodov --- pkg/util/flagext/stringslicecsv.go | 33 ++++++++++++++++++++++++ pkg/util/flagext/stringslicecsv_test.go | 34 +++++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 pkg/util/flagext/stringslicecsv.go create mode 100644 pkg/util/flagext/stringslicecsv_test.go diff --git a/pkg/util/flagext/stringslicecsv.go b/pkg/util/flagext/stringslicecsv.go new file mode 100644 index 00000000000..47ccd54ca08 --- /dev/null +++ b/pkg/util/flagext/stringslicecsv.go @@ -0,0 +1,33 @@ +package flagext + +import "strings" + +// StringSliceCSV is a slice of strings that is parsed from a comma-separated string +// It implements flag.Value and yaml Marshalers +type StringSliceCSV []string + +// String implements flag.Value +func (v StringSliceCSV) String() string { + return strings.Join(v, ",") +} + +// Set implements flag.Value +func (v *StringSliceCSV) Set(s string) error { + *v = strings.Split(s, ",") + return nil +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (v *StringSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + return v.Set(s) +} + +// MarshalYAML implements yaml.Marshaler. +func (v StringSliceCSV) MarshalYAML() (interface{}, error) { + return v.String(), nil +} diff --git a/pkg/util/flagext/stringslicecsv_test.go b/pkg/util/flagext/stringslicecsv_test.go new file mode 100644 index 00000000000..67e50ba1df4 --- /dev/null +++ b/pkg/util/flagext/stringslicecsv_test.go @@ -0,0 +1,34 @@ +package flagext + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +func Test_StringSliceCSV(t *testing.T) { + type TestStruct struct { + CSV StringSliceCSV `yaml:"csv"` + } + + var testStruct TestStruct + s := "a,b,c,d" + assert.Nil(t, testStruct.CSV.Set(s)) + + assert.Equal(t, []string{"a", "b", "c", "d"}, []string(testStruct.CSV)) + assert.Equal(t, s, testStruct.CSV.String()) + + expected := []byte(`csv: a,b,c,d +`) + + actual, err := yaml.Marshal(testStruct) + assert.Nil(t, err) + assert.Equal(t, expected, actual) + + var testStruct2 TestStruct + + err = yaml.Unmarshal(expected, &testStruct2) + assert.Nil(t, err) + assert.Equal(t, testStruct, testStruct2) +} From b83369b4729c314b0a525eaebad31abe1d66d17e Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Wed, 7 Oct 2020 17:15:16 +0200 Subject: [PATCH 05/13] Fix linter errors Signed-off-by: Igor Novgorodov --- pkg/cortex/cortex.go | 4 +++- pkg/cortex/cortex_test.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 13a8ae02b8b..ff504a48eab 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -110,7 +110,9 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Server.MetricsNamespace = "cortex" c.Server.ExcludeRequestInLog = true - c.Target.Set(All) + // Set the default module list to 'all' + // Make linter happy + c.Target.Set(All) //nolint:errcheck f.Var((*flagext.StringSliceCSV)(&c.Target), "target", "List of Cortex modules to load, comma separated. "+ "The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+ diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index e795682e883..69ba7c1c695 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -71,7 +71,7 @@ func TestCortex(t *testing.T) { }, } - cfg.Target.Set(fmt.Sprintf("%s,%s", All, Compactor)) + cfg.Target.Set(fmt.Sprintf("%s,%s", All, Compactor)) //nolint:errcheck c, err := New(cfg) require.NoError(t, err) From 53190a148b42aa50655795f3485b891514944855 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Wed, 7 Oct 2020 21:21:42 +0200 Subject: [PATCH 06/13] Reduce state in ModuleManager Signed-off-by: Igor Novgorodov --- pkg/util/modules/modules.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index 3352bab37bb..6d8aef534b0 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -25,12 +25,6 @@ type module struct { // in the right order of dependencies. type Manager struct { modules map[string]*module - - // Modules that are already initialized - modulesInitialized map[string]bool - - // Service map - servicesMap map[string]services.Service } // UserInvisibleModule is an option for `RegisterModule` that marks module not visible to user. Modules are user visible by default. @@ -41,9 +35,7 @@ func UserInvisibleModule(m *module) { // NewManager creates a new Manager func NewManager() *Manager { return &Manager{ - modules: make(map[string]*module), - modulesInitialized: make(map[string]bool), - servicesMap: make(map[string]services.Service), + modules: make(map[string]*module), } } @@ -76,16 +68,19 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { // in the right order. Modules are wrapped in such a way that they start after their // dependencies have been started and stop before their dependencies are stopped. func (m *Manager) InitModuleServices(modules ...string) (map[string]services.Service, error) { + svcsMap := map[string]services.Service{} + initMap := map[string]bool{} + for _, module := range modules { - if err := m.initModule(module); err != nil { + if err := m.initModule(module, initMap, svcsMap); err != nil { return nil, err } } - return m.servicesMap, nil + return svcsMap, nil } -func (m *Manager) initModule(name string) error { +func (m *Manager) initModule(name string, initMap map[string]bool, svcsMap map[string]services.Service) error { if _, ok := m.modules[name]; !ok { return fmt.Errorf("unrecognised module name: %s", name) } @@ -95,8 +90,8 @@ func (m *Manager) initModule(name string) error { deps = append(deps, name) // lastly, initialize the requested module for ix, n := range deps { - // Skip already loaded modules - if m.modulesInitialized[n] { + // Skip already initialized modules + if initMap[n] { continue } @@ -113,15 +108,15 @@ func (m *Manager) initModule(name string) error { if s != nil { // We pass servicesMap, which isn't yet complete. By the time service starts, // it will be fully built, so there is no need for extra synchronization. - serv = newModuleServiceWrapper(m.servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) + serv = newModuleServiceWrapper(svcsMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) } } if serv != nil { - m.servicesMap[n] = serv + svcsMap[n] = serv } - m.modulesInitialized[n] = true + initMap[n] = true } return nil From 5d98caab141cb8cbdd4efea281953e8a194a52c0 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Thu, 8 Oct 2020 11:57:47 +0200 Subject: [PATCH 07/13] Fix pkg/cortex & pkg/util/modules tests, make initModules() stateless, update docs Signed-off-by: Igor Novgorodov --- docs/configuration/config-file-reference.md | 8 ++++---- pkg/cortex/cortex.go | 17 ++++++++--------- pkg/cortex/cortex_test.go | 19 +++++++++---------- pkg/util/modules/modules_test.go | 10 ++++++++-- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b44f461d5b2..497d71f991f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -52,10 +52,10 @@ Where default_value is the value to use if the environment variable is undefined ### Supported contents and default values of the config file ```yaml -# List of Cortex modules to load, comma separated. The alias 'all' can be used -# in the list to load a number of core modules and will enable single-binary -# mode. Use '-modules' command line flag to get a list of available modules, and -# to see which modules are included in 'all'. +# Comma-separated list of Cortex modules to load. The alias 'all' can be used in +# the list to load a number of core modules and will enable single-binary mode. +# Use '-modules' command line flag to get a list of available modules, and to +# see which modules are included in 'all'. # CLI flag: -target [target: | default = "all"] diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index ff504a48eab..b2bf1a8e853 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -111,10 +111,9 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Server.ExcludeRequestInLog = true // Set the default module list to 'all' - // Make linter happy - c.Target.Set(All) //nolint:errcheck + c.Target = []string{All} - f.Var((*flagext.StringSliceCSV)(&c.Target), "target", "List of Cortex modules to load, comma separated. "+ + f.Var((*flagext.StringSliceCSV)(&c.Target), "target", "Comma-separated list of Cortex modules to load. "+ "The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+ "Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.") @@ -317,25 +316,25 @@ func (t *Cortex) setupThanosTracing() { t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, ThanosTracerStreamInterceptor) } -func (t *Cortex) initModules() (err error) { +func (t *Cortex) initModules() (svcMap map[string]services.Service, err error) { for _, module := range t.Cfg.Target { if !t.ModuleManager.IsUserVisibleModule(module) { level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module) } } - t.ServiceMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...) + svcMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...) if err != nil { - return err + return } t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler)) - return nil + return } // Run starts Cortex running, and blocks until a Cortex stops. -func (t *Cortex) Run() error { - if err := t.initModules(); err != nil { +func (t *Cortex) Run() (err error) { + if t.ServiceMap, err = t.initModules(); err != nil { return err } diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 69ba7c1c695..1f7bcdbfae9 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -1,7 +1,6 @@ package cortex import ( - "fmt" "net/url" "testing" @@ -71,26 +70,26 @@ func TestCortex(t *testing.T) { }, } - cfg.Target.Set(fmt.Sprintf("%s,%s", All, Compactor)) //nolint:errcheck + cfg.Target = []string{All, Compactor} c, err := New(cfg) require.NoError(t, err) - err = c.initModules() + svcMap, err := c.initModules() require.NoError(t, err) - require.NotNil(t, c.ServiceMap) + require.NotNil(t, svcMap) - for m, s := range c.ServiceMap { + for m, s := range svcMap { // make sure each service is still New require.Equal(t, services.New, s.State(), "module: %s", m) } // check random modules that we expect to be configured when using Target=All - require.NotNil(t, c.ServiceMap[Server]) - require.NotNil(t, c.ServiceMap[IngesterService]) - require.NotNil(t, c.ServiceMap[Ring]) - require.NotNil(t, c.ServiceMap[DistributorService]) + require.NotNil(t, svcMap[Server]) + require.NotNil(t, svcMap[IngesterService]) + require.NotNil(t, svcMap[Ring]) + require.NotNil(t, svcMap[DistributorService]) // check that compactor is configured which is not part of Target=All - require.NotNil(t, c.ServiceMap[Compactor]) + require.NotNil(t, svcMap[Compactor]) } diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index 13b545045ac..f62fdd62a99 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -56,14 +56,20 @@ func TestDependencies(t *testing.T) { assert.Error(t, err) assert.Empty(t, svc) + // Test loading several modules + svc, err = mm.InitModuleServices("serviceA", "serviceB") + assert.Nil(t, err) + assert.Equal(t, 2, len(svc)) + svc, err = mm.InitModuleServices("serviceC") assert.NoError(t, err) assert.Equal(t, 3, len(svc)) - // Test loading of the module second time (should be noop) - svc, err = mm.InitModuleServices("serviceC") + // Test loading of the module second time - should produce the same set of services, but new instances. + svc2, err := mm.InitModuleServices("serviceC") assert.NoError(t, err) assert.Equal(t, 3, len(svc)) + assert.NotEqual(t, svc, svc2) } func TestRegisterModuleDefaultsToUserVisible(t *testing.T) { From a3613cad6d4d0788c39f65dc85fad287a14e0b15 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Thu, 8 Oct 2020 12:01:05 +0200 Subject: [PATCH 08/13] Fix missed mess with &c.Target pointer cast Signed-off-by: Igor Novgorodov --- pkg/cortex/cortex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index b2bf1a8e853..067cd5d04bb 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -113,7 +113,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { // Set the default module list to 'all' c.Target = []string{All} - f.Var((*flagext.StringSliceCSV)(&c.Target), "target", "Comma-separated list of Cortex modules to load. "+ + f.Var(&c.Target, "target", "Comma-separated list of Cortex modules to load. "+ "The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+ "Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.") From 4bb0dadf9ada9422a7074d674846f13fa1f758a8 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Thu, 8 Oct 2020 17:57:03 +0200 Subject: [PATCH 09/13] Cosmetic changes to minimize diff Signed-off-by: Igor Novgorodov --- pkg/cortex/cortex.go | 16 +++++----------- pkg/cortex/cortex_test.go | 22 ++++++++++++---------- pkg/util/modules/modules.go | 12 ++++++------ 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 067cd5d04bb..336232f26cf 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -316,27 +316,21 @@ func (t *Cortex) setupThanosTracing() { t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, ThanosTracerStreamInterceptor) } -func (t *Cortex) initModules() (svcMap map[string]services.Service, err error) { +// Run starts Cortex running, and blocks until a Cortex stops. +func (t *Cortex) Run() error { for _, module := range t.Cfg.Target { if !t.ModuleManager.IsUserVisibleModule(module) { level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module) } } - svcMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...) + var err error + t.ServiceMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...) if err != nil { - return + return err } t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler)) - return -} - -// Run starts Cortex running, and blocks until a Cortex stops. -func (t *Cortex) Run() (err error) { - if t.ServiceMap, err = t.initModules(); err != nil { - return err - } // get all services, create service manager and tell it to start servs := []services.Service(nil) diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 1f7bcdbfae9..8f0968835ec 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -1,6 +1,7 @@ package cortex import ( + "log" "net/url" "testing" @@ -68,28 +69,29 @@ func TestCortex(t *testing.T) { }, }, }, - } - cfg.Target = []string{All, Compactor} + Target: []string{All, Compactor}, + } c, err := New(cfg) require.NoError(t, err) - svcMap, err := c.initModules() + serviceMap, err := c.ModuleManager.InitModuleServices(cfg.Target...) require.NoError(t, err) - require.NotNil(t, svcMap) + require.NotNil(t, serviceMap) + log.Println(serviceMap) - for m, s := range svcMap { + for m, s := range serviceMap { // make sure each service is still New require.Equal(t, services.New, s.State(), "module: %s", m) } // check random modules that we expect to be configured when using Target=All - require.NotNil(t, svcMap[Server]) - require.NotNil(t, svcMap[IngesterService]) - require.NotNil(t, svcMap[Ring]) - require.NotNil(t, svcMap[DistributorService]) + require.NotNil(t, serviceMap[Server]) + require.NotNil(t, serviceMap[IngesterService]) + require.NotNil(t, serviceMap[Ring]) + require.NotNil(t, serviceMap[DistributorService]) // check that compactor is configured which is not part of Target=All - require.NotNil(t, svcMap[Compactor]) + require.NotNil(t, serviceMap[Compactor]) } diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index 6d8aef534b0..5a2b49d30e3 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -68,19 +68,19 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { // in the right order. Modules are wrapped in such a way that they start after their // dependencies have been started and stop before their dependencies are stopped. func (m *Manager) InitModuleServices(modules ...string) (map[string]services.Service, error) { - svcsMap := map[string]services.Service{} + servicesMap := map[string]services.Service{} initMap := map[string]bool{} for _, module := range modules { - if err := m.initModule(module, initMap, svcsMap); err != nil { + if err := m.initModule(module, initMap, servicesMap); err != nil { return nil, err } } - return svcsMap, nil + return servicesMap, nil } -func (m *Manager) initModule(name string, initMap map[string]bool, svcsMap map[string]services.Service) error { +func (m *Manager) initModule(name string, initMap map[string]bool, servicesMap map[string]services.Service) error { if _, ok := m.modules[name]; !ok { return fmt.Errorf("unrecognised module name: %s", name) } @@ -108,12 +108,12 @@ func (m *Manager) initModule(name string, initMap map[string]bool, svcsMap map[s if s != nil { // We pass servicesMap, which isn't yet complete. By the time service starts, // it will be fully built, so there is no need for extra synchronization. - serv = newModuleServiceWrapper(svcsMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) + serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) } } if serv != nil { - svcsMap[n] = serv + servicesMap[n] = serv } initMap[n] = true From dfc92dac53738f26f2c44994072c8efd32dbcdd7 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Thu, 8 Oct 2020 19:44:30 +0200 Subject: [PATCH 10/13] Fix problems with PR #3287 Signed-off-by: Igor Novgorodov --- pkg/cortex/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index acd44f108e6..28f326b1b66 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -592,7 +592,7 @@ func (t *Cortex) initCompactor() (serv services.Service, err error) { func (t *Cortex) initStoreGateway() (serv services.Service, err error) { if t.Cfg.Storage.Engine != storage.StorageEngineBlocks { - if t.Cfg.Target != All { + if !t.Cfg.isModuleEnabled(All) { return nil, fmt.Errorf("storage engine must be set to blocks to enable the store-gateway") } return nil, nil From 80282b730d12b2238e70e031bb0da8c2c77a2d49 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 9 Oct 2020 13:16:26 +0200 Subject: [PATCH 11/13] Update CHANGELOG.md Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index abfd292aec2..9f7bc81d582 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,7 +48,7 @@ * [CHANGE] Renamed `-redis.enable-tls` CLI flag to `-redis.tls-enabled`, and its respective YAML config option from `enable_tls` to `tls_enabled`. #3298 * [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257 * [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217 -* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3272 +* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275 * [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244 - `-blocks-storage.s3.http.idle-conn-timeout` - `-blocks-storage.s3.http.response-header-timeout` From 0875ecc777c35a23c7c9a2b5d9bffb32de2328ec Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 9 Oct 2020 13:20:08 +0200 Subject: [PATCH 12/13] Update pkg/cortex/cortex_test.go Signed-off-by: Marco Pracucci --- pkg/cortex/cortex_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 8f0968835ec..cd197df579f 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -79,7 +79,6 @@ func TestCortex(t *testing.T) { serviceMap, err := c.ModuleManager.InitModuleServices(cfg.Target...) require.NoError(t, err) require.NotNil(t, serviceMap) - log.Println(serviceMap) for m, s := range serviceMap { // make sure each service is still New From e64d873e73b996044e26f41bb7bfc2f683403728 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 9 Oct 2020 13:25:21 +0200 Subject: [PATCH 13/13] Fixed linter Signed-off-by: Marco Pracucci --- pkg/cortex/cortex_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index cd197df579f..d8f3a8c7829 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -1,7 +1,6 @@ package cortex import ( - "log" "net/url" "testing"