Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [CHANGE] Renamed `-redis.enable-tls` CLI flag to `-redis.tls-enabled`, and its respective YAML config option from `enable_tls` to `tls_enabled`. #3298
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
- `-blocks-storage.s3.http.idle-conn-timeout`
- `-blocks-storage.s3.http.response-header-timeout`
Expand Down
7 changes: 6 additions & 1 deletion cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ func main() {
// In testing mode skip JAEGER setup to avoid panic due to
// "duplicate metrics collector registration attempted"
if !testMode {
name := "cortex"
if len(cfg.Target) == 1 {
name += "-" + cfg.Target[0]
}

// Setting the environment variable JAEGER_AGENT_HOST enables tracing.
if trace, err := tracing.NewFromEnv("cortex-" + cfg.Target); err != nil {
if trace, err := tracing.NewFromEnv(name); err != nil {
level.Error(util.Logger).Log("msg", "Failed to setup tracing", "err", err.Error())
} else {
defer trace.Close()
Expand Down
6 changes: 4 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Where default_value is the value to use if the environment variable is undefined
### Supported contents and default values of the config file

```yaml
# The Cortex module to run. Use "-modules" command line flag to get a list of
# available modules, and to see which modules are included in "All".
# Comma-separated list of Cortex modules to load. The alias 'all' can be used in
# the list to load a number of core modules and will enable single-binary mode.
# Use '-modules' command line flag to get a list of available modules, and to
# see which modules are included in 'all'.
# CLI flag: -target
[target: <string> | default = "all"]

Expand Down
31 changes: 22 additions & 9 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ import (

// Config is the root config for Cortex.
type Config struct {
Target string `yaml:"target"`
AuthEnabled bool `yaml:"auth_enabled"`
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`
Target flagext.StringSliceCSV `yaml:"target"`
AuthEnabled bool `yaml:"auth_enabled"`
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`

API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Expand Down Expand Up @@ -109,7 +109,14 @@ type Config struct {
func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Server.MetricsNamespace = "cortex"
c.Server.ExcludeRequestInLog = true
f.StringVar(&c.Target, "target", All, "The Cortex module to run. Use \"-modules\" command line flag to get a list of available modules, and to see which modules are included in \"All\".")

// Set the default module list to 'all'
c.Target = []string{All}

f.Var(&c.Target, "target", "Comma-separated list of Cortex modules to load. "+
"The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+
"Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.")

f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")
f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.")
f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.")
Expand Down Expand Up @@ -203,6 +210,10 @@ func (c *Config) Validate(log log.Logger) error {
return nil
}

func (c *Config) isModuleEnabled(m string) bool {
return util.StringsContain(c.Target, m)
}

// validateYAMLEmptyNodes ensure that no empty node has been specified in the YAML config file.
// When an empty node is defined in YAML, the YAML parser sets the whole struct to its zero value
// and so we loose all default values. It's very difficult to detect this case for the user, so we
Expand Down Expand Up @@ -307,16 +318,18 @@ func (t *Cortex) setupThanosTracing() {

// Run starts Cortex running, and blocks until a Cortex stops.
func (t *Cortex) Run() error {
if !t.ModuleManager.IsUserVisibleModule(t.Cfg.Target) {
level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", t.Cfg.Target)
for _, module := range t.Cfg.Target {
if !t.ModuleManager.IsUserVisibleModule(module) {
level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module)
}
}

serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target)
var err error
t.ServiceMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...)
if err != nil {
return err
}

t.ServiceMap = serviceMap
t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler))

// get all services, create service manager and tell it to start
Expand Down
8 changes: 6 additions & 2 deletions pkg/cortex/cortex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ func TestCortex(t *testing.T) {
},
},
},
Target: All,

Target: []string{All, Compactor},
}

c, err := New(cfg)
require.NoError(t, err)

serviceMap, err := c.ModuleManager.InitModuleServices(c.Cfg.Target)
serviceMap, err := c.ModuleManager.InitModuleServices(cfg.Target...)
require.NoError(t, err)
require.NotNil(t, serviceMap)

Expand All @@ -88,4 +89,7 @@ func TestCortex(t *testing.T) {
require.NotNil(t, serviceMap[IngesterService])
require.NotNil(t, serviceMap[Ring])
require.NotNil(t, serviceMap[DistributorService])

// check that compactor is configured which is not part of Target=All
require.NotNil(t, serviceMap[Compactor])
}
17 changes: 8 additions & 9 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
t.Cfg.LimitsConfig.RulerEvaluationDelay = t.Cfg.Ruler.EvaluationDelay

// No need to report if this field isn't going to be used.
if t.Cfg.Target == All || t.Cfg.Target == Ruler {
if t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(All) {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(util.Logger).Log("msg", "Using DEPRECATED YAML config field ruler.evaluation_delay_duration, please use limits.ruler_evaluation_delay_duration instead.")
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
// ruler's dependency)
canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor)
canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All)

t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer)
if err != nil {
Expand Down Expand Up @@ -222,12 +222,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
}, []string{"method", "route"})

// if we are not configured for single binary mode then the querier needs to register its paths externally
registerExternally := t.Cfg.Target != All
registerExternally := !t.Cfg.isModuleEnabled(All)
handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally, t.TombstonesLoader, querierRequestDuration, receivedMessageSize, sentMessageSize, inflightRequests)

// single binary mode requires a properly configured worker. if the operator did not attempt to configure the
// worker we will attempt an automatic configuration here
if t.Cfg.Worker.Address == "" && t.Cfg.Target == All {
if t.Cfg.Worker.Address == "" && t.Cfg.isModuleEnabled(All) {
address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort)
level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address)
t.Cfg.Worker.Address = address
Expand Down Expand Up @@ -298,7 +298,7 @@ func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, l
case storage.StorageEngineBlocks:
// When running in single binary, if the blocks sharding is disabled and no custom
// store-gateway address has been configured, we can set it to the running process.
if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort)
}

Expand Down Expand Up @@ -510,13 +510,12 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() {
if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
return
}

t.RulerStorage, err = ruler.NewRuleStorage(t.Cfg.Ruler.StoreConfig, rules.FileLoader{})

return
}

Expand Down Expand Up @@ -574,7 +573,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
return
}

t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Target == AlertManager, t.Cfg.Alertmanager.EnableAPI)
t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI)
return t.Alertmanager, nil
}

Expand All @@ -593,7 +592,7 @@ func (t *Cortex) initCompactor() (serv services.Service, err error) {

func (t *Cortex) initStoreGateway() (serv services.Service, err error) {
if t.Cfg.Storage.Engine != storage.StorageEngineBlocks {
if t.Cfg.Target != All {
if !t.Cfg.isModuleEnabled(All) {
return nil, fmt.Errorf("storage engine must be set to blocks to enable the store-gateway")
}
return nil, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S
return services.NewBasicService(nil, runFn, stoppingFn)
}

// DisableSignalHandling puts a dummy signal handler
func DisableSignalHandling(config *server.Config) {
config.SignalHandler = make(ignoreSignalHandler)
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/util/flagext/stringslicecsv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package flagext

import "strings"

// StringSliceCSV is a slice of strings that is parsed from a comma-separated string
// It implements flag.Value and yaml Marshalers
type StringSliceCSV []string

// String implements flag.Value
func (v StringSliceCSV) String() string {
return strings.Join(v, ",")
}

// Set implements flag.Value
func (v *StringSliceCSV) Set(s string) error {
*v = strings.Split(s, ",")
return nil
}

// UnmarshalYAML implements yaml.Unmarshaler.
func (v *StringSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return err
}

return v.Set(s)
}

// MarshalYAML implements yaml.Marshaler.
func (v StringSliceCSV) MarshalYAML() (interface{}, error) {
return v.String(), nil
}
34 changes: 34 additions & 0 deletions pkg/util/flagext/stringslicecsv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package flagext

import (
"testing"

"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
)

func Test_StringSliceCSV(t *testing.T) {
type TestStruct struct {
CSV StringSliceCSV `yaml:"csv"`
}

var testStruct TestStruct
s := "a,b,c,d"
assert.Nil(t, testStruct.CSV.Set(s))

assert.Equal(t, []string{"a", "b", "c", "d"}, []string(testStruct.CSV))
assert.Equal(t, s, testStruct.CSV.String())

expected := []byte(`csv: a,b,c,d
`)

actual, err := yaml.Marshal(testStruct)
assert.Nil(t, err)
assert.Equal(t, expected, actual)

var testStruct2 TestStruct

err = yaml.Unmarshal(expected, &testStruct2)
assert.Nil(t, err)
assert.Equal(t, testStruct, testStruct2)
}
36 changes: 27 additions & 9 deletions pkg/util/modules/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,45 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error {
return nil
}

// InitModuleServices initialises the target module by initialising all its dependencies
// InitModuleServices initialises given modules by initialising all their dependencies
// in the right order. Modules are wrapped in such a way that they start after their
// dependencies have been started and stop before their dependencies are stopped.
func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
if _, ok := m.modules[target]; !ok {
return nil, fmt.Errorf("unrecognised module name: %s", target)
func (m *Manager) InitModuleServices(modules ...string) (map[string]services.Service, error) {
servicesMap := map[string]services.Service{}
initMap := map[string]bool{}

for _, module := range modules {
if err := m.initModule(module, initMap, servicesMap); err != nil {
return nil, err
}
}

servicesMap := map[string]services.Service{}
return servicesMap, nil
}

func (m *Manager) initModule(name string, initMap map[string]bool, servicesMap map[string]services.Service) error {
if _, ok := m.modules[name]; !ok {
return fmt.Errorf("unrecognised module name: %s", name)
}

// initialize all of our dependencies first
deps := m.orderedDeps(target)
deps = append(deps, target) // lastly, initialize the requested module
deps := m.orderedDeps(name)
deps = append(deps, name) // lastly, initialize the requested module

for ix, n := range deps {
// Skip already initialized modules
if initMap[n] {
continue
}

mod := m.modules[n]

var serv services.Service

if mod.initFn != nil {
s, err := mod.initFn()
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
}

if s != nil {
Expand All @@ -99,9 +115,11 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service
if serv != nil {
servicesMap[n] = serv
}

initMap[n] = true
}

return servicesMap, nil
return nil
}

// UserVisibleModuleNames gets list of module names that are
Expand Down
Loading