Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 51 additions & 51 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,38 +185,38 @@ func (c *Config) Validate(log log.Logger) error {

// Cortex is the root datastructure for Cortex.
type Cortex struct {
target ModuleName
Cfg Config

// set during initialization
serviceMap map[ModuleName]services.Service

api *api.API
server *server.Server
ring *ring.Ring
overrides *validation.Overrides
distributor *distributor.Distributor
ingester *ingester.Ingester
flusher *flusher.Flusher
store chunk.Store
deletesStore *purger.DeleteStore
frontend *frontend.Frontend
tableManager *chunk.TableManager
cache cache.Cache
runtimeConfig *runtimeconfig.Manager
dataPurger *purger.DataPurger
tombstonesLoader *purger.TombstonesLoader

ruler *ruler.Ruler
configAPI *configAPI.API
configDB db.DB
alertmanager *alertmanager.MultitenantAlertmanager
compactor *compactor.Compactor
storeGateway *storegateway.StoreGateway
memberlistKV *memberlist.KVInit
ServiceMap map[ModuleName]services.Service

API *api.API
Server *server.Server
Ring *ring.Ring
Overrides *validation.Overrides
Distributor *distributor.Distributor
Ingester *ingester.Ingester
Flusher *flusher.Flusher
Store chunk.Store
DeletesStore *purger.DeleteStore
Frontend *frontend.Frontend
TableManager *chunk.TableManager
Cache cache.Cache
RuntimeConfig *runtimeconfig.Manager
DataPurger *purger.DataPurger
TombstonesLoader *purger.TombstonesLoader

Ruler *ruler.Ruler
ConfigAPI *configAPI.API
ConfigDB db.DB
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.Compactor
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInit

// Queryable that the querier should use to query the long
// term storage. It depends on the storage engine used.
storeQueryable prom_storage.Queryable
StoreQueryable prom_storage.Queryable
}

// New makes a new Cortex.
Expand All @@ -229,28 +229,28 @@ func New(cfg Config) (*Cortex, error) {
}

cortex := &Cortex{
target: cfg.Target,
Cfg: cfg,
}

cortex.setupAuthMiddleware(&cfg)
cortex.setupAuthMiddleware()

serviceMap, err := cortex.initModuleServices(&cfg, cfg.Target)
serviceMap, err := cortex.initModuleServices()
if err != nil {
return nil, err
}

cortex.serviceMap = serviceMap
cortex.api.RegisterServiceMapHandler(http.HandlerFunc(cortex.servicesHandler))
cortex.ServiceMap = serviceMap
cortex.API.RegisterServiceMapHandler(http.HandlerFunc(cortex.servicesHandler))

return cortex, nil
}

func (t *Cortex) setupAuthMiddleware(cfg *Config) {
if cfg.AuthEnabled {
cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
func (t *Cortex) setupAuthMiddleware() {
if t.Cfg.AuthEnabled {
t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
middleware.ServerUserHeaderInterceptor,
}
cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
switch info.FullMethod {
// Don't check auth header on TransferChunks, as we weren't originally
Expand All @@ -266,36 +266,36 @@ func (t *Cortex) setupAuthMiddleware(cfg *Config) {
},
}
} else {
cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
fakeGRPCAuthUniaryMiddleware,
}
cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
fakeGRPCAuthStreamMiddleware,
}
cfg.API.HTTPAuthMiddleware = fakeHTTPAuthMiddleware
t.Cfg.API.HTTPAuthMiddleware = fakeHTTPAuthMiddleware
}
}

func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleName]services.Service, error) {
func (t *Cortex) initModuleServices() (map[ModuleName]services.Service, error) {
servicesMap := map[ModuleName]services.Service{}

// initialize all of our dependencies first
deps := orderedDeps(target)
deps = append(deps, target) // lastly, initialize the requested module
deps := orderedDeps(t.Cfg.Target)
deps = append(deps, t.Cfg.Target) // lastly, initialize the requested module

for ix, n := range deps {
mod := modules[n]

var serv services.Service

if mod.service != nil {
s, err := mod.service(t, cfg)
s, err := mod.service(t)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
}
serv = s
} else if mod.wrappedService != nil {
s, err := mod.wrappedService(t, cfg)
s, err := mod.wrappedService(t)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
}
Expand All @@ -318,7 +318,7 @@ func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleN
func (t *Cortex) Run() error {
// get all services, create service manager and tell it to start
servs := []services.Service(nil)
for _, s := range t.serviceMap {
for _, s := range t.ServiceMap {
servs = append(servs, s)
}

Expand All @@ -329,8 +329,8 @@ func (t *Cortex) Run() error {

// before starting servers, register /ready handler and gRPC health check service.
// It should reflect entire Cortex.
t.server.HTTP.Path("/ready").Handler(t.readyHandler(sm))
grpc_health_v1.RegisterHealthServer(t.server.GRPC, healthcheck.New(sm))
t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm))
grpc_health_v1.RegisterHealthServer(t.Server.GRPC, healthcheck.New(sm))

// Let's listen for events from this manager, and log them.
healthy := func() { level.Info(util.Logger).Log("msg", "Cortex started") }
Expand All @@ -340,7 +340,7 @@ func (t *Cortex) Run() error {
sm.StopAsync()

// let's find out which module failed
for m, s := range t.serviceMap {
for m, s := range t.ServiceMap {
if s == service {
if service.FailureCase() == util.ErrStopProcess {
level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase())
Expand All @@ -361,7 +361,7 @@ func (t *Cortex) Run() error {
// It will also be stopped via service manager if any service fails (see attached service listener)
// Attach listener before starting services, or we may miss the notification.
serverStopping := make(chan struct{})
t.serviceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) {
t.ServiceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) {
close(serverStopping)
}, nil, nil))

Expand Down Expand Up @@ -411,8 +411,8 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc {

// Ingester has a special check that makes sure that it was able to register into the ring,
// and that all other ring entries are OK too.
if t.ingester != nil {
if err := t.ingester.CheckReady(r.Context()); err != nil {
if t.Ingester != nil {
if err := t.Ingester.CheckReady(r.Context()); err != nil {
http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cortex/cortex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ func TestCortex(t *testing.T) {

c, err := New(cfg)
require.NoError(t, err)
require.NotNil(t, c.serviceMap)
require.NotNil(t, c.ServiceMap)

for m, s := range c.serviceMap {
for m, s := range c.ServiceMap {
// make sure each service is still New
require.Equal(t, services.New, s.State(), "module: %s", m)
}

// check random modules that we expect to be configured when using Target=All
require.NotNil(t, c.serviceMap[Server])
require.NotNil(t, c.serviceMap[Ingester])
require.NotNil(t, c.serviceMap[Ring])
require.NotNil(t, c.serviceMap[Distributor])
require.NotNil(t, c.ServiceMap[Server])
require.NotNil(t, c.ServiceMap[Ingester])
require.NotNil(t, c.ServiceMap[Ring])
require.NotNil(t, c.ServiceMap[Distributor])

// check that findInverseDependencie for Ring -- querier and distributor depend on Ring, so should be returned.
require.ElementsMatch(t, []ModuleName{Distributor, Querier}, findInverseDependencies(Ring, modules[cfg.Target].deps))
Expand Down
Loading