Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- `POST /ingester/push`
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3272
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
- `-blocks-storage.s3.http.idle-conn-timeout`
- `-blocks-storage.s3.http.response-header-timeout`
Expand Down
7 changes: 6 additions & 1 deletion cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ func main() {
// In testing mode skip JAEGER setup to avoid panic due to
// "duplicate metrics collector registration attempted"
if !testMode {
name := "cortex"
if len(cfg.Target) == 1 {
name += "-" + cfg.Target[0]
}

// Setting the environment variable JAEGER_AGENT_HOST enables tracing.
if trace, err := tracing.NewFromEnv("cortex-" + cfg.Target); err != nil {
if trace, err := tracing.NewFromEnv(name); err != nil {
level.Error(util.Logger).Log("msg", "Failed to setup tracing", "err", err.Error())
} else {
defer trace.Close()
Expand Down
6 changes: 4 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Where default_value is the value to use if the environment variable is undefined
### Supported contents and default values of the config file

```yaml
# The Cortex module to run. Use "-modules" command line flag to get a list of
# available modules, and to see which modules are included in "All".
# List of Cortex modules to load, comma separated. The alias 'all' can be used
# in the list to load a number of core modules and will enable single-binary
# mode. Use '-modules' command line flag to get a list of available modules, and
# to see which modules are included in 'all'.
# CLI flag: -target
[target: <string> | default = "all"]

Expand Down
42 changes: 31 additions & 11 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ import (

// Config is the root config for Cortex.
type Config struct {
Target string `yaml:"target"`
AuthEnabled bool `yaml:"auth_enabled"`
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`
Target flagext.StringSliceCSV `yaml:"target"`
AuthEnabled bool `yaml:"auth_enabled"`
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`

API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Expand Down Expand Up @@ -109,7 +109,15 @@ type Config struct {
func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Server.MetricsNamespace = "cortex"
c.Server.ExcludeRequestInLog = true
f.StringVar(&c.Target, "target", All, "The Cortex module to run. Use \"-modules\" command line flag to get a list of available modules, and to see which modules are included in \"All\".")

// Set the default module list to 'all'
// Make linter happy
c.Target.Set(All) //nolint:errcheck

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Set the default module list to 'all'
// Make linter happy
c.Target.Set(All) //nolint:errcheck
c.Target = []string{All}

f.Var((*flagext.StringSliceCSV)(&c.Target), "target", "List of Cortex modules to load, comma separated. "+
"The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+
"Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f.Var((*flagext.StringSliceCSV)(&c.Target), "target", "List of Cortex modules to load, comma separated. "+
"The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+
"Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.")
f.Var(&c.Target, "target", "Comma-separated list of Cortex modules to load. "+
"The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+
"Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.")

f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")
f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.")
f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.")
Expand Down Expand Up @@ -203,6 +211,10 @@ func (c *Config) Validate(log log.Logger) error {
return nil
}

func (c *Config) isModuleEnabled(m string) bool {
return util.StringsContain(c.Target, m)
}

// validateYAMLEmptyNodes ensure that no empty node has been specified in the YAML config file.
// When an empty node is defined in YAML, the YAML parser sets the whole struct to its zero value
// and so we loose all default values. It's very difficult to detect this case for the user, so we
Expand Down Expand Up @@ -305,19 +317,27 @@ func (t *Cortex) setupThanosTracing() {
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, ThanosTracerStreamInterceptor)
}

// Run starts Cortex running, and blocks until a Cortex stops.
func (t *Cortex) Run() error {
if !t.ModuleManager.IsUserVisibleModule(t.Cfg.Target) {
level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", t.Cfg.Target)
func (t *Cortex) initModules() (err error) {
for _, module := range t.Cfg.Target {
if !t.ModuleManager.IsUserVisibleModule(module) {
level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module)
}
}

serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target)
t.ServiceMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...)
if err != nil {
return err
}

t.ServiceMap = serviceMap
t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler))
return nil
}

// Run starts Cortex running, and blocks until a Cortex stops.
func (t *Cortex) Run() error {
if err := t.initModules(); err != nil {
return err
}

// get all services, create service manager and tell it to start
servs := []services.Service(nil)
Expand Down
21 changes: 13 additions & 8 deletions pkg/cortex/cortex_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cortex

import (
"fmt"
"net/url"
"testing"

Expand Down Expand Up @@ -68,24 +69,28 @@ func TestCortex(t *testing.T) {
},
},
},
Target: All,
}

cfg.Target.Set(fmt.Sprintf("%s,%s", All, Compactor)) //nolint:errcheck

c, err := New(cfg)
require.NoError(t, err)

serviceMap, err := c.ModuleManager.InitModuleServices(c.Cfg.Target)
err = c.initModules()
require.NoError(t, err)
require.NotNil(t, serviceMap)
require.NotNil(t, c.ServiceMap)

for m, s := range serviceMap {
for m, s := range c.ServiceMap {
// make sure each service is still New
require.Equal(t, services.New, s.State(), "module: %s", m)
}

// check random modules that we expect to be configured when using Target=All
require.NotNil(t, serviceMap[Server])
require.NotNil(t, serviceMap[IngesterService])
require.NotNil(t, serviceMap[Ring])
require.NotNil(t, serviceMap[DistributorService])
require.NotNil(t, c.ServiceMap[Server])
require.NotNil(t, c.ServiceMap[IngesterService])
require.NotNil(t, c.ServiceMap[Ring])
require.NotNil(t, c.ServiceMap[DistributorService])

// check that compactor is configured which is not part of Target=All
require.NotNil(t, c.ServiceMap[Compactor])
}
15 changes: 7 additions & 8 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
t.Cfg.LimitsConfig.RulerEvaluationDelay = t.Cfg.Ruler.EvaluationDelay

// No need to report if this field isn't going to be used.
if t.Cfg.Target == All || t.Cfg.Target == Ruler {
if t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(All) {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(util.Logger).Log("msg", "Using DEPRECATED YAML config field ruler.evaluation_delay_duration, please use limits.ruler_evaluation_delay_duration instead.")
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
// ruler's dependency)
canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor)
canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All)

t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer)
if err != nil {
Expand Down Expand Up @@ -222,12 +222,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
}, []string{"method", "route"})

// if we are not configured for single binary mode then the querier needs to register its paths externally
registerExternally := t.Cfg.Target != All
registerExternally := !t.Cfg.isModuleEnabled(All)
handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally, t.TombstonesLoader, querierRequestDuration, receivedMessageSize, sentMessageSize, inflightRequests)

// single binary mode requires a properly configured worker. if the operator did not attempt to configure the
// worker we will attempt an automatic configuration here
if t.Cfg.Worker.Address == "" && t.Cfg.Target == All {
if t.Cfg.Worker.Address == "" && t.Cfg.isModuleEnabled(All) {
address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort)
level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address)
t.Cfg.Worker.Address = address
Expand Down Expand Up @@ -298,7 +298,7 @@ func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, l
case storage.StorageEngineBlocks:
// When running in single binary, if the blocks sharding is disabled and no custom
// store-gateway address has been configured, we can set it to the running process.
if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort)
}

Expand Down Expand Up @@ -510,13 +510,12 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() {
if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
return
}

t.RulerStorage, err = ruler.NewRuleStorage(t.Cfg.Ruler.StoreConfig, rules.FileLoader{})

return
}

Expand Down Expand Up @@ -574,7 +573,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
return
}

t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Target == AlertManager, t.Cfg.Alertmanager.EnableAPI)
t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI)
return t.Alertmanager, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S
return services.NewBasicService(nil, runFn, stoppingFn)
}

// DisableSignalHandling puts a dummy signal handler
func DisableSignalHandling(config *server.Config) {
config.SignalHandler = make(ignoreSignalHandler)
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/util/flagext/stringslicecsv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package flagext

import "strings"

// StringSliceCSV is a slice of strings that is parsed from a comma-separated string
// It implements flag.Value and yaml Marshalers
type StringSliceCSV []string

// String implements flag.Value
func (v StringSliceCSV) String() string {
return strings.Join(v, ",")
}

// Set implements flag.Value
func (v *StringSliceCSV) Set(s string) error {
*v = strings.Split(s, ",")
return nil
}

// UnmarshalYAML implements yaml.Unmarshaler.
func (v *StringSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return err
}

return v.Set(s)
}

// MarshalYAML implements yaml.Marshaler.
func (v StringSliceCSV) MarshalYAML() (interface{}, error) {
return v.String(), nil
}
34 changes: 34 additions & 0 deletions pkg/util/flagext/stringslicecsv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package flagext

import (
"testing"

"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
)

func Test_StringSliceCSV(t *testing.T) {
type TestStruct struct {
CSV StringSliceCSV `yaml:"csv"`
}

var testStruct TestStruct
s := "a,b,c,d"
assert.Nil(t, testStruct.CSV.Set(s))

assert.Equal(t, []string{"a", "b", "c", "d"}, []string(testStruct.CSV))
assert.Equal(t, s, testStruct.CSV.String())

expected := []byte(`csv: a,b,c,d
`)

actual, err := yaml.Marshal(testStruct)
assert.Nil(t, err)
assert.Equal(t, expected, actual)

var testStruct2 TestStruct

err = yaml.Unmarshal(expected, &testStruct2)
assert.Nil(t, err)
assert.Equal(t, testStruct, testStruct2)
}
47 changes: 35 additions & 12 deletions pkg/util/modules/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ type module struct {
// in the right order of dependencies.
type Manager struct {
modules map[string]*module

// Modules that are already initialized
modulesInitialized map[string]bool

// Service map
servicesMap map[string]services.Service
}

// UserInvisibleModule is an option for `RegisterModule` that marks module not visible to user. Modules are user visible by default.
Expand All @@ -35,7 +41,9 @@ func UserInvisibleModule(m *module) {
// NewManager creates a new Manager
func NewManager() *Manager {
return &Manager{
modules: make(map[string]*module),
modules: make(map[string]*module),
modulesInitialized: make(map[string]bool),
servicesMap: make(map[string]services.Service),
}
}

Expand Down Expand Up @@ -64,44 +72,59 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error {
return nil
}

// InitModuleServices initialises the target module by initialising all its dependencies
// InitModuleServices initialises given modules by initialising all their dependencies
// in the right order. Modules are wrapped in such a way that they start after their
// dependencies have been started and stop before their dependencies are stopped.
func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
if _, ok := m.modules[target]; !ok {
return nil, fmt.Errorf("unrecognised module name: %s", target)
func (m *Manager) InitModuleServices(modules ...string) (map[string]services.Service, error) {
for _, module := range modules {
if err := m.initModule(module); err != nil {
return nil, err
}
}

servicesMap := map[string]services.Service{}
return m.servicesMap, nil
}

func (m *Manager) initModule(name string) error {
if _, ok := m.modules[name]; !ok {
return fmt.Errorf("unrecognised module name: %s", name)
}

// initialize all of our dependencies first
deps := m.orderedDeps(target)
deps = append(deps, target) // lastly, initialize the requested module
deps := m.orderedDeps(name)
deps = append(deps, name) // lastly, initialize the requested module

for ix, n := range deps {
// Skip already loaded modules
if m.modulesInitialized[n] {
continue
}

mod := m.modules[n]

var serv services.Service

if mod.initFn != nil {
s, err := mod.initFn()
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
}

if s != nil {
// We pass servicesMap, which isn't yet complete. By the time service starts,
// it will be fully built, so there is no need for extra synchronization.
serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
serv = newModuleServiceWrapper(m.servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
}
}

if serv != nil {
servicesMap[n] = serv
m.servicesMap[n] = serv
}

m.modulesInitialized[n] = true
}

return servicesMap, nil
return nil
}

// UserVisibleModuleNames gets list of module names that are
Expand Down
Loading