diff --git a/CHANGELOG.md b/CHANGELOG.md index f8d119f35d4..9f7bc81d582 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ * [CHANGE] Renamed `-redis.enable-tls` CLI flag to `-redis.tls-enabled`, and its respective YAML config option from `enable_tls` to `tls_enabled`. #3298 * [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257 * [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217 +* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275 * [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244 - `-blocks-storage.s3.http.idle-conn-timeout` - `-blocks-storage.s3.http.response-header-timeout` diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index ec72a041207..8955696326b 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -146,8 +146,13 @@ func main() { // In testing mode skip JAEGER setup to avoid panic due to // "duplicate metrics collector registration attempted" if !testMode { + name := "cortex" + if len(cfg.Target) == 1 { + name += "-" + cfg.Target[0] + } + // Setting the environment variable JAEGER_AGENT_HOST enables tracing. - if trace, err := tracing.NewFromEnv("cortex-" + cfg.Target); err != nil { + if trace, err := tracing.NewFromEnv(name); err != nil { level.Error(util.Logger).Log("msg", "Failed to setup tracing", "err", err.Error()) } else { defer trace.Close() diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index bc6bfdd2a08..b3c48d0e9eb 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -52,8 +52,10 @@ Where default_value is the value to use if the environment variable is undefined ### Supported contents and default values of the config file ```yaml -# The Cortex module to run. Use "-modules" command line flag to get a list of -# available modules, and to see which modules are included in "All". +# Comma-separated list of Cortex modules to load. The alias 'all' can be used in +# the list to load a number of core modules and will enable single-binary mode. +# Use '-modules' command line flag to get a list of available modules, and to +# see which modules are included in 'all'. # CLI flag: -target [target: | default = "all"] diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 0704f39f8eb..336232f26cf 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -71,10 +71,10 @@ import ( // Config is the root config for Cortex. type Config struct { - Target string `yaml:"target"` - AuthEnabled bool `yaml:"auth_enabled"` - PrintConfig bool `yaml:"-"` - HTTPPrefix string `yaml:"http_prefix"` + Target flagext.StringSliceCSV `yaml:"target"` + AuthEnabled bool `yaml:"auth_enabled"` + PrintConfig bool `yaml:"-"` + HTTPPrefix string `yaml:"http_prefix"` API api.Config `yaml:"api"` Server server.Config `yaml:"server"` @@ -109,7 +109,14 @@ type Config struct { func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Server.MetricsNamespace = "cortex" c.Server.ExcludeRequestInLog = true - f.StringVar(&c.Target, "target", All, "The Cortex module to run. Use \"-modules\" command line flag to get a list of available modules, and to see which modules are included in \"All\".") + + // Set the default module list to 'all' + c.Target = []string{All} + + f.Var(&c.Target, "target", "Comma-separated list of Cortex modules to load. "+ + "The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+ + "Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.") + f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.") f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.") @@ -203,6 +210,10 @@ func (c *Config) Validate(log log.Logger) error { return nil } +func (c *Config) isModuleEnabled(m string) bool { + return util.StringsContain(c.Target, m) +} + // validateYAMLEmptyNodes ensure that no empty node has been specified in the YAML config file. // When an empty node is defined in YAML, the YAML parser sets the whole struct to its zero value // and so we loose all default values. It's very difficult to detect this case for the user, so we @@ -307,16 +318,18 @@ func (t *Cortex) setupThanosTracing() { // Run starts Cortex running, and blocks until a Cortex stops. func (t *Cortex) Run() error { - if !t.ModuleManager.IsUserVisibleModule(t.Cfg.Target) { - level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", t.Cfg.Target) + for _, module := range t.Cfg.Target { + if !t.ModuleManager.IsUserVisibleModule(module) { + level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module) + } } - serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target) + var err error + t.ServiceMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...) if err != nil { return err } - t.ServiceMap = serviceMap t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler)) // get all services, create service manager and tell it to start diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 1c9070a996b..d8f3a8c7829 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -68,13 +68,14 @@ func TestCortex(t *testing.T) { }, }, }, - Target: All, + + Target: []string{All, Compactor}, } c, err := New(cfg) require.NoError(t, err) - serviceMap, err := c.ModuleManager.InitModuleServices(c.Cfg.Target) + serviceMap, err := c.ModuleManager.InitModuleServices(cfg.Target...) require.NoError(t, err) require.NotNil(t, serviceMap) @@ -88,4 +89,7 @@ func TestCortex(t *testing.T) { require.NotNil(t, serviceMap[IngesterService]) require.NotNil(t, serviceMap[Ring]) require.NotNil(t, serviceMap[DistributorService]) + + // check that compactor is configured which is not part of Target=All + require.NotNil(t, serviceMap[Compactor]) } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f37a87324d2..28f326b1b66 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -135,7 +135,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { t.Cfg.LimitsConfig.RulerEvaluationDelay = t.Cfg.Ruler.EvaluationDelay // No need to report if this field isn't going to be used. - if t.Cfg.Target == All || t.Cfg.Target == Ruler { + if t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(All) { flagext.DeprecatedFlagsUsed.Inc() level.Warn(util.Logger).Log("msg", "Using DEPRECATED YAML config field ruler.evaluation_delay_duration, please use limits.ruler_evaluation_delay_duration instead.") } @@ -173,7 +173,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or // ruler's dependency) - canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor) + canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All) t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer) if err != nil { @@ -222,12 +222,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { }, []string{"method", "route"}) // if we are not configured for single binary mode then the querier needs to register its paths externally - registerExternally := t.Cfg.Target != All + registerExternally := !t.Cfg.isModuleEnabled(All) handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally, t.TombstonesLoader, querierRequestDuration, receivedMessageSize, sentMessageSize, inflightRequests) // single binary mode requires a properly configured worker. if the operator did not attempt to configure the // worker we will attempt an automatic configuration here - if t.Cfg.Worker.Address == "" && t.Cfg.Target == All { + if t.Cfg.Worker.Address == "" && t.Cfg.isModuleEnabled(All) { address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort) level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address) t.Cfg.Worker.Address = address @@ -298,7 +298,7 @@ func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, l case storage.StorageEngineBlocks: // When running in single binary, if the blocks sharding is disabled and no custom // store-gateway address has been configured, we can set it to the running process. - if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { + if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) } @@ -510,13 +510,12 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) { // unfortunately there is no way to generate a "default" config and compare default against actual // to determine if it's unconfigured. the following check, however, correctly tests this. // Single binary integration tests will break if this ever drifts - if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() { + if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() { level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.") return } t.RulerStorage, err = ruler.NewRuleStorage(t.Cfg.Ruler.StoreConfig, rules.FileLoader{}) - return } @@ -574,7 +573,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { return } - t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Target == AlertManager, t.Cfg.Alertmanager.EnableAPI) + t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI) return t.Alertmanager, nil } @@ -593,7 +592,7 @@ func (t *Cortex) initCompactor() (serv services.Service, err error) { func (t *Cortex) initStoreGateway() (serv services.Service, err error) { if t.Cfg.Storage.Engine != storage.StorageEngineBlocks { - if t.Cfg.Target != All { + if !t.Cfg.isModuleEnabled(All) { return nil, fmt.Errorf("storage engine must be set to blocks to enable the store-gateway") } return nil, nil diff --git a/pkg/cortex/server_service.go b/pkg/cortex/server_service.go index c5a84942e81..7a565fbfb01 100644 --- a/pkg/cortex/server_service.go +++ b/pkg/cortex/server_service.go @@ -54,6 +54,7 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S return services.NewBasicService(nil, runFn, stoppingFn) } +// DisableSignalHandling puts a dummy signal handler func DisableSignalHandling(config *server.Config) { config.SignalHandler = make(ignoreSignalHandler) } diff --git a/pkg/util/flagext/stringslicecsv.go b/pkg/util/flagext/stringslicecsv.go new file mode 100644 index 00000000000..47ccd54ca08 --- /dev/null +++ b/pkg/util/flagext/stringslicecsv.go @@ -0,0 +1,33 @@ +package flagext + +import "strings" + +// StringSliceCSV is a slice of strings that is parsed from a comma-separated string +// It implements flag.Value and yaml Marshalers +type StringSliceCSV []string + +// String implements flag.Value +func (v StringSliceCSV) String() string { + return strings.Join(v, ",") +} + +// Set implements flag.Value +func (v *StringSliceCSV) Set(s string) error { + *v = strings.Split(s, ",") + return nil +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (v *StringSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + return v.Set(s) +} + +// MarshalYAML implements yaml.Marshaler. +func (v StringSliceCSV) MarshalYAML() (interface{}, error) { + return v.String(), nil +} diff --git a/pkg/util/flagext/stringslicecsv_test.go b/pkg/util/flagext/stringslicecsv_test.go new file mode 100644 index 00000000000..67e50ba1df4 --- /dev/null +++ b/pkg/util/flagext/stringslicecsv_test.go @@ -0,0 +1,34 @@ +package flagext + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +func Test_StringSliceCSV(t *testing.T) { + type TestStruct struct { + CSV StringSliceCSV `yaml:"csv"` + } + + var testStruct TestStruct + s := "a,b,c,d" + assert.Nil(t, testStruct.CSV.Set(s)) + + assert.Equal(t, []string{"a", "b", "c", "d"}, []string(testStruct.CSV)) + assert.Equal(t, s, testStruct.CSV.String()) + + expected := []byte(`csv: a,b,c,d +`) + + actual, err := yaml.Marshal(testStruct) + assert.Nil(t, err) + assert.Equal(t, expected, actual) + + var testStruct2 TestStruct + + err = yaml.Unmarshal(expected, &testStruct2) + assert.Nil(t, err) + assert.Equal(t, testStruct, testStruct2) +} diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index 1af768016bf..5a2b49d30e3 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -64,21 +64,37 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { return nil } -// InitModuleServices initialises the target module by initialising all its dependencies +// InitModuleServices initialises given modules by initialising all their dependencies // in the right order. Modules are wrapped in such a way that they start after their // dependencies have been started and stop before their dependencies are stopped. -func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) { - if _, ok := m.modules[target]; !ok { - return nil, fmt.Errorf("unrecognised module name: %s", target) +func (m *Manager) InitModuleServices(modules ...string) (map[string]services.Service, error) { + servicesMap := map[string]services.Service{} + initMap := map[string]bool{} + + for _, module := range modules { + if err := m.initModule(module, initMap, servicesMap); err != nil { + return nil, err + } } - servicesMap := map[string]services.Service{} + return servicesMap, nil +} + +func (m *Manager) initModule(name string, initMap map[string]bool, servicesMap map[string]services.Service) error { + if _, ok := m.modules[name]; !ok { + return fmt.Errorf("unrecognised module name: %s", name) + } // initialize all of our dependencies first - deps := m.orderedDeps(target) - deps = append(deps, target) // lastly, initialize the requested module + deps := m.orderedDeps(name) + deps = append(deps, name) // lastly, initialize the requested module for ix, n := range deps { + // Skip already initialized modules + if initMap[n] { + continue + } + mod := m.modules[n] var serv services.Service @@ -86,7 +102,7 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service if mod.initFn != nil { s, err := mod.initFn() if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) + return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) } if s != nil { @@ -99,9 +115,11 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service if serv != nil { servicesMap[n] = serv } + + initMap[n] = true } - return servicesMap, nil + return nil } // UserVisibleModuleNames gets list of module names that are diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index 8734c3c9b05..f62fdd62a99 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -1,6 +1,7 @@ package modules import ( + "errors" "fmt" "testing" @@ -10,7 +11,9 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" ) -func mockInitFunc() (services.Service, error) { return nil, nil } +func mockInitFunc() (services.Service, error) { return services.NewIdleService(nil, nil), nil } + +func mockInitFuncFail() (services.Service, error) { return nil, errors.New("Error") } func TestDependencies(t *testing.T) { var testModules = map[string]module{ @@ -25,6 +28,10 @@ func TestDependencies(t *testing.T) { "serviceC": { initFn: mockInitFunc, }, + + "serviceD": { + initFn: mockInitFuncFail, + }, } mm := NewManager() @@ -39,13 +46,30 @@ func TestDependencies(t *testing.T) { require.Len(t, invDeps, 1) assert.Equal(t, invDeps[0], "serviceB") - svcs, err := mm.InitModuleServices("serviceC") - assert.NotNil(t, svcs) + // Test unknown module + svc, err := mm.InitModuleServices("service_unknown") + assert.Error(t, err, fmt.Errorf("unrecognised module name: service_unknown")) + assert.Empty(t, svc) + + // Test init failure + svc, err = mm.InitModuleServices("serviceD") + assert.Error(t, err) + assert.Empty(t, svc) + + // Test loading several modules + svc, err = mm.InitModuleServices("serviceA", "serviceB") + assert.Nil(t, err) + assert.Equal(t, 2, len(svc)) + + svc, err = mm.InitModuleServices("serviceC") assert.NoError(t, err) + assert.Equal(t, 3, len(svc)) - svcs, err = mm.InitModuleServices("service_unknown") - assert.Nil(t, svcs) - assert.Error(t, err, fmt.Errorf("unrecognised module name: service_unknown")) + // Test loading of the module second time - should produce the same set of services, but new instances. + svc2, err := mm.InitModuleServices("serviceC") + assert.NoError(t, err) + assert.Equal(t, 3, len(svc)) + assert.NotEqual(t, svc, svc2) } func TestRegisterModuleDefaultsToUserVisible(t *testing.T) { diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index f0480412d14..2beadb2627e 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -241,6 +241,8 @@ func getFieldType(t reflect.Type) (string, error) { return "duration", nil case "cortex.moduleName": return "string", nil + case "flagext.StringSliceCSV": + return "string", nil } // Fallback to auto-detection of built-in data types