Skip to content

Commit 91f7470

Browse files
authored
Refactor Cortex struct as part of Generalising Module Service (#2570)
* Redefine variable scope, refactor Cortex struct as part of Generalising module service Signed-off-by: Annanay <[email protected]> * Fix lint, tests Signed-off-by: Annanay <[email protected]>
1 parent 7d1ee7f commit 91f7470

File tree

4 files changed

+219
-220
lines changed

4 files changed

+219
-220
lines changed

pkg/cortex/cortex.go

Lines changed: 51 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -185,38 +185,38 @@ func (c *Config) Validate(log log.Logger) error {
185185

186186
// Cortex is the root datastructure for Cortex.
187187
type Cortex struct {
188-
target ModuleName
188+
Cfg Config
189189

190190
// set during initialization
191-
serviceMap map[ModuleName]services.Service
192-
193-
api *api.API
194-
server *server.Server
195-
ring *ring.Ring
196-
overrides *validation.Overrides
197-
distributor *distributor.Distributor
198-
ingester *ingester.Ingester
199-
flusher *flusher.Flusher
200-
store chunk.Store
201-
deletesStore *purger.DeleteStore
202-
frontend *frontend.Frontend
203-
tableManager *chunk.TableManager
204-
cache cache.Cache
205-
runtimeConfig *runtimeconfig.Manager
206-
dataPurger *purger.DataPurger
207-
tombstonesLoader *purger.TombstonesLoader
208-
209-
ruler *ruler.Ruler
210-
configAPI *configAPI.API
211-
configDB db.DB
212-
alertmanager *alertmanager.MultitenantAlertmanager
213-
compactor *compactor.Compactor
214-
storeGateway *storegateway.StoreGateway
215-
memberlistKV *memberlist.KVInit
191+
ServiceMap map[ModuleName]services.Service
192+
193+
API *api.API
194+
Server *server.Server
195+
Ring *ring.Ring
196+
Overrides *validation.Overrides
197+
Distributor *distributor.Distributor
198+
Ingester *ingester.Ingester
199+
Flusher *flusher.Flusher
200+
Store chunk.Store
201+
DeletesStore *purger.DeleteStore
202+
Frontend *frontend.Frontend
203+
TableManager *chunk.TableManager
204+
Cache cache.Cache
205+
RuntimeConfig *runtimeconfig.Manager
206+
DataPurger *purger.DataPurger
207+
TombstonesLoader *purger.TombstonesLoader
208+
209+
Ruler *ruler.Ruler
210+
ConfigAPI *configAPI.API
211+
ConfigDB db.DB
212+
Alertmanager *alertmanager.MultitenantAlertmanager
213+
Compactor *compactor.Compactor
214+
StoreGateway *storegateway.StoreGateway
215+
MemberlistKV *memberlist.KVInit
216216

217217
// Queryable that the querier should use to query the long
218218
// term storage. It depends on the storage engine used.
219-
storeQueryable prom_storage.Queryable
219+
StoreQueryable prom_storage.Queryable
220220
}
221221

222222
// New makes a new Cortex.
@@ -229,28 +229,28 @@ func New(cfg Config) (*Cortex, error) {
229229
}
230230

231231
cortex := &Cortex{
232-
target: cfg.Target,
232+
Cfg: cfg,
233233
}
234234

235-
cortex.setupAuthMiddleware(&cfg)
235+
cortex.setupAuthMiddleware()
236236

237-
serviceMap, err := cortex.initModuleServices(&cfg, cfg.Target)
237+
serviceMap, err := cortex.initModuleServices()
238238
if err != nil {
239239
return nil, err
240240
}
241241

242-
cortex.serviceMap = serviceMap
243-
cortex.api.RegisterServiceMapHandler(http.HandlerFunc(cortex.servicesHandler))
242+
cortex.ServiceMap = serviceMap
243+
cortex.API.RegisterServiceMapHandler(http.HandlerFunc(cortex.servicesHandler))
244244

245245
return cortex, nil
246246
}
247247

248-
func (t *Cortex) setupAuthMiddleware(cfg *Config) {
249-
if cfg.AuthEnabled {
250-
cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
248+
func (t *Cortex) setupAuthMiddleware() {
249+
if t.Cfg.AuthEnabled {
250+
t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
251251
middleware.ServerUserHeaderInterceptor,
252252
}
253-
cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
253+
t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
254254
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
255255
switch info.FullMethod {
256256
// Don't check auth header on TransferChunks, as we weren't originally
@@ -266,36 +266,36 @@ func (t *Cortex) setupAuthMiddleware(cfg *Config) {
266266
},
267267
}
268268
} else {
269-
cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
269+
t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
270270
fakeGRPCAuthUniaryMiddleware,
271271
}
272-
cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
272+
t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
273273
fakeGRPCAuthStreamMiddleware,
274274
}
275-
cfg.API.HTTPAuthMiddleware = fakeHTTPAuthMiddleware
275+
t.Cfg.API.HTTPAuthMiddleware = fakeHTTPAuthMiddleware
276276
}
277277
}
278278

279-
func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleName]services.Service, error) {
279+
func (t *Cortex) initModuleServices() (map[ModuleName]services.Service, error) {
280280
servicesMap := map[ModuleName]services.Service{}
281281

282282
// initialize all of our dependencies first
283-
deps := orderedDeps(target)
284-
deps = append(deps, target) // lastly, initialize the requested module
283+
deps := orderedDeps(t.Cfg.Target)
284+
deps = append(deps, t.Cfg.Target) // lastly, initialize the requested module
285285

286286
for ix, n := range deps {
287287
mod := modules[n]
288288

289289
var serv services.Service
290290

291291
if mod.service != nil {
292-
s, err := mod.service(t, cfg)
292+
s, err := mod.service(t)
293293
if err != nil {
294294
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
295295
}
296296
serv = s
297297
} else if mod.wrappedService != nil {
298-
s, err := mod.wrappedService(t, cfg)
298+
s, err := mod.wrappedService(t)
299299
if err != nil {
300300
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
301301
}
@@ -318,7 +318,7 @@ func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleN
318318
func (t *Cortex) Run() error {
319319
// get all services, create service manager and tell it to start
320320
servs := []services.Service(nil)
321-
for _, s := range t.serviceMap {
321+
for _, s := range t.ServiceMap {
322322
servs = append(servs, s)
323323
}
324324

@@ -329,8 +329,8 @@ func (t *Cortex) Run() error {
329329

330330
// before starting servers, register /ready handler and gRPC health check service.
331331
// It should reflect entire Cortex.
332-
t.server.HTTP.Path("/ready").Handler(t.readyHandler(sm))
333-
grpc_health_v1.RegisterHealthServer(t.server.GRPC, healthcheck.New(sm))
332+
t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm))
333+
grpc_health_v1.RegisterHealthServer(t.Server.GRPC, healthcheck.New(sm))
334334

335335
// Let's listen for events from this manager, and log them.
336336
healthy := func() { level.Info(util.Logger).Log("msg", "Cortex started") }
@@ -340,7 +340,7 @@ func (t *Cortex) Run() error {
340340
sm.StopAsync()
341341

342342
// let's find out which module failed
343-
for m, s := range t.serviceMap {
343+
for m, s := range t.ServiceMap {
344344
if s == service {
345345
if service.FailureCase() == util.ErrStopProcess {
346346
level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase())
@@ -361,7 +361,7 @@ func (t *Cortex) Run() error {
361361
// It will also be stopped via service manager if any service fails (see attached service listener)
362362
// Attach listener before starting services, or we may miss the notification.
363363
serverStopping := make(chan struct{})
364-
t.serviceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) {
364+
t.ServiceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) {
365365
close(serverStopping)
366366
}, nil, nil))
367367

@@ -411,8 +411,8 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc {
411411

412412
// Ingester has a special check that makes sure that it was able to register into the ring,
413413
// and that all other ring entries are OK too.
414-
if t.ingester != nil {
415-
if err := t.ingester.CheckReady(r.Context()); err != nil {
414+
if t.Ingester != nil {
415+
if err := t.Ingester.CheckReady(r.Context()); err != nil {
416416
http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable)
417417
return
418418
}

pkg/cortex/cortex_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,18 @@ func TestCortex(t *testing.T) {
5252

5353
c, err := New(cfg)
5454
require.NoError(t, err)
55-
require.NotNil(t, c.serviceMap)
55+
require.NotNil(t, c.ServiceMap)
5656

57-
for m, s := range c.serviceMap {
57+
for m, s := range c.ServiceMap {
5858
// make sure each service is still New
5959
require.Equal(t, services.New, s.State(), "module: %s", m)
6060
}
6161

6262
// check random modules that we expect to be configured when using Target=All
63-
require.NotNil(t, c.serviceMap[Server])
64-
require.NotNil(t, c.serviceMap[Ingester])
65-
require.NotNil(t, c.serviceMap[Ring])
66-
require.NotNil(t, c.serviceMap[Distributor])
63+
require.NotNil(t, c.ServiceMap[Server])
64+
require.NotNil(t, c.ServiceMap[Ingester])
65+
require.NotNil(t, c.ServiceMap[Ring])
66+
require.NotNil(t, c.ServiceMap[Distributor])
6767

6868
// check that findInverseDependencie for Ring -- querier and distributor depend on Ring, so should be returned.
6969
require.ElementsMatch(t, []ModuleName{Distributor, Querier}, findInverseDependencies(Ring, modules[cfg.Target].deps))

0 commit comments

Comments
 (0)