Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
* `cortex_querier_blocks_consistency_checks_failed_total`
* `cortex_querier_storegateway_refetches_per_query`
* [ENHANCEMENT] Cortex is now built with Go 1.14. #2480 #2753
* [ENHANCEMENT] Experimental: Querier can now optionally query secondary store. This is specified by using `-querier.second-store-engine` option, with values `chunks` or `tsdb`. Standard configuration options for this store are used. Additionally, this querying can be configured to happen only for queries that need data older than `-querier.use-second-store-before-time`. Default value of zero will always query secondary store. #2747
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ To specify which configuration file to load, pass the `-config.file` flag at the
* `<string>`: a regular string
* `<url>`: an URL
* `<prefix>`: a CLI flag prefix based on the context (look at the parent configuration block to see which CLI flags prefix should be used)
* `<time>`: a timestamp, with available formats: `2006-01-20` (midnight, local timezone), `2006-01-20T15:04` (local timezone), and RFC 3339 formats: `2006-01-20T15:04:05Z` (UTC) or `2006-01-20T15:04:05+07:00` (explicit timezone)

### Use environment variables in the configuration

Expand Down Expand Up @@ -667,6 +668,15 @@ store_gateway_client:
# TLS CA path for the client
# CLI flag: -experimental.querier.store-gateway-client.tls-ca-path
[tls_ca_path: <string> | default = ""]

# Second store engine to use for querying. Empty = disabled.
# CLI flag: -querier.second-store-engine
[second_store_engine: <string> | default = ""]

# If specified, second store is only used for queries before this timestamp.
# Default value 0 means secondary store is always queried.
# CLI flag: -querier.use-second-store-before-time
[use_second_store_before_time: <time> | default = 0]
```

### `query_frontend_config`
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/config-file-reference.template
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ To specify which configuration file to load, pass the `-config.file` flag at the
* `<string>`: a regular string
* `<url>`: an URL
* `<prefix>`: a CLI flag prefix based on the context (look at the parent configuration block to see which CLI flags prefix should be used)
* `<time>`: a timestamp, with available formats: `2006-01-20` (midnight, local timezone), `2006-01-20T15:04` (local timezone), and RFC 3339 formats: `2006-01-20T15:04:05Z` (UTC) or `2006-01-20T15:04:05+07:00` (explicit timezone)

### Use environment variables in the configuration

Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ Currently experimental features are:
- In-memory (FIFO) and Redis cache.
- Openstack Swift storage.
- gRPC Store.
- Querier support for querying chunks and blocks store at the same time.
20 changes: 15 additions & 5 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
}

func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]IndexQuery) []IndexQuery) ([]string, error) {
log, ctx := spanlogger.New(ctx, "Store.lookupIdsByMetricNameMatcher", "metricName", metricName, "matcher", matcher)
log, ctx := spanlogger.New(ctx, "Store.lookupIdsByMetricNameMatcher", "metricName", metricName, "matcher", formatMatcher(matcher))
defer log.Span.Finish()

var err error
Expand Down Expand Up @@ -474,11 +474,11 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro
if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", matcher, "queries", len(queries))
level.Debug(log).Log("matcher", formatMatcher(matcher), "queries", len(queries))

if filter != nil {
queries = filter(queries)
level.Debug(log).Log("matcher", matcher, "filteredQueries", len(queries))
level.Debug(log).Log("matcher", formatMatcher(matcher), "filteredQueries", len(queries))
}

entries, err := c.lookupEntriesByQueries(ctx, queries)
Expand All @@ -489,17 +489,27 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro
} else if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", matcher, "entries", len(entries))
level.Debug(log).Log("matcher", formatMatcher(matcher), "entries", len(entries))

ids, err := c.parseIndexEntries(ctx, entries, matcher)
if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", matcher, "ids", len(ids))
level.Debug(log).Log("matcher", formatMatcher(matcher), "ids", len(ids))

return ids, nil
}

// Using this function avoids logging of nil matcher, which works, but indirectly via panic and recover.
// That confuses attached debugger, which wants to breakpoint on each panic.
// Using simple check is also faster.
func formatMatcher(matcher *labels.Matcher) string {
if matcher == nil {
return "nil"
}
return matcher.String()
}

func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()
Expand Down
5 changes: 2 additions & 3 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
Expand Down Expand Up @@ -218,9 +217,9 @@ type Cortex struct {
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInitService

// Queryable that the querier should use to query the long
// Queryables that the querier should use to query the long
// term storage. It depends on the storage engine used.
StoreQueryable prom_storage.Queryable
StoreQueryables []querier.QueryableWithFilter
}

// New makes a new Cortex.
Expand Down
91 changes: 65 additions & 26 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package cortex
import (
"fmt"
"os"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
prom_storage "github.com/prometheus/prometheus/storage"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/server"
Expand Down Expand Up @@ -162,7 +164,7 @@ func (t *Cortex) initDistributor() (serv services.Service, err error) {
}

func (t *Cortex) initQuerier() (serv services.Service, err error) {
queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryable, t.TombstonesLoader, prometheus.DefaultRegisterer)
queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryables, t.TombstonesLoader, prometheus.DefaultRegisterer)

// Prometheus histograms for requests to the querier.
querierRequestDuration := promauto.With(prometheus.DefaultRegisterer).NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -194,37 +196,74 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
return worker, nil
}

func (t *Cortex) initStoreQueryable() (services.Service, error) {
if t.Cfg.Storage.Engine == storage.StorageEngineChunks {
t.StoreQueryable = querier.NewChunkStoreQueryable(t.Cfg.Querier, t.Store)
return nil, nil
func (t *Cortex) initStoreQueryables() (services.Service, error) {
var servs []services.Service

//nolint:golint // I prefer this form over removing 'else', because it allows q to have smaller scope.
if q, err := initQueryableForEngine(t.Cfg.Storage.Engine, t.Cfg, t.Store, prometheus.DefaultRegisterer); err != nil {
return nil, fmt.Errorf("failed to initialize querier for engine '%s': %v", t.Cfg.Storage.Engine, err)
} else {
t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(q))
if s, ok := q.(services.Service); ok {
servs = append(servs, s)
}
}

if t.Cfg.Storage.Engine == storage.StorageEngineTSDB && !t.Cfg.TSDB.StoreGatewayEnabled {
storeQueryable, err := querier.NewBlockQueryable(t.Cfg.TSDB, t.Cfg.Server.LogLevel, prometheus.DefaultRegisterer)
if t.Cfg.Querier.SecondStoreEngine != "" {
if t.Cfg.Querier.SecondStoreEngine == t.Cfg.Storage.Engine {
return nil, fmt.Errorf("second store engine used by querier '%s' must be different than primary engine '%s'", t.Cfg.Querier.SecondStoreEngine, t.Cfg.Storage.Engine)
}

sq, err := initQueryableForEngine(t.Cfg.Querier.SecondStoreEngine, t.Cfg, t.Store, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize querier for engine '%s': %v", t.Cfg.Querier.SecondStoreEngine, err)
}

t.StoreQueryables = append(t.StoreQueryables, querier.UseBeforeTimestampQueryable(sq, time.Time(t.Cfg.Querier.UseSecondStoreBeforeTime)))

if s, ok := sq.(services.Service); ok {
servs = append(servs, s)
}
t.StoreQueryable = storeQueryable
return storeQueryable, nil
}

if t.Cfg.Storage.Engine == storage.StorageEngineTSDB && t.Cfg.TSDB.StoreGatewayEnabled {
// Return service, if any.
switch len(servs) {
case 0:
return nil, nil
case 1:
return servs[0], nil
default:
// No need to support this case yet, since chunk store is not a service.
// When we get there, we will need a wrapper service, that starts all subservices, and will also monitor them for failures.
// Not difficult, but also not necessary right now.
return nil, fmt.Errorf("too many services")
}
}

func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, reg prometheus.Registerer) (prom_storage.Queryable, error) {
switch engine {
case storage.StorageEngineChunks:
if chunkStore == nil {
return nil, fmt.Errorf("chunk store not initialized")
}
return querier.NewChunkStoreQueryable(cfg.Querier, chunkStore), nil

case storage.StorageEngineTSDB:
if !cfg.TSDB.StoreGatewayEnabled {
return querier.NewBlockQueryable(cfg.TSDB, cfg.Server.LogLevel, reg)
}

// When running in single binary, if the blocks sharding is disabled and no custom
// store-gateway address has been configured, we can set it to the running process.
if t.Cfg.Target == All && !t.Cfg.StoreGateway.ShardingEnabled && t.Cfg.Querier.StoreGatewayAddresses == "" {
t.Cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort)
if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort)
}

storeQueryable, err := querier.NewBlocksStoreQueryableFromConfig(t.Cfg.Querier, t.Cfg.StoreGateway, t.Cfg.TSDB, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
t.StoreQueryable = storeQueryable
return storeQueryable, nil
}
return querier.NewBlocksStoreQueryableFromConfig(cfg.Querier, cfg.StoreGateway, cfg.TSDB, util.Logger, reg)

return nil, fmt.Errorf("unknown storage engine '%s'", t.Cfg.Storage.Engine)
default:
return nil, fmt.Errorf("unknown storage engine '%s'", engine)
}
}

func (t *Cortex) initIngester() (serv services.Service, err error) {
Expand Down Expand Up @@ -260,8 +299,8 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) {
return t.Flusher, nil
}

func (t *Cortex) initStore() (serv services.Service, err error) {
if t.Cfg.Storage.Engine == storage.StorageEngineTSDB {
func (t *Cortex) initChunkStore() (serv services.Service, err error) {
if t.Cfg.Storage.Engine != storage.StorageEngineChunks && t.Cfg.Querier.SecondStoreEngine != storage.StorageEngineChunks {
return nil, nil
}
err = t.Cfg.Schema.Load()
Expand Down Expand Up @@ -411,7 +450,7 @@ func (t *Cortex) initTableManager() (services.Service, error) {
func (t *Cortex) initRuler() (serv services.Service, err error) {
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryable, t.TombstonesLoader, prometheus.DefaultRegisterer)
queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryables, t.TombstonesLoader, prometheus.DefaultRegisterer)

t.Ruler, err = ruler.NewRuler(t.Cfg.Ruler, engine, queryable, t.Distributor, prometheus.DefaultRegisterer, util.Logger)
if err != nil {
Expand Down Expand Up @@ -521,12 +560,12 @@ func (t *Cortex) setupModuleManager() error {
mm.RegisterModule(Ring, t.initRing)
mm.RegisterModule(Overrides, t.initOverrides)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Store, t.initStore)
mm.RegisterModule(Store, t.initChunkStore)
mm.RegisterModule(DeleteRequestsStore, t.initDeleteRequestsStore)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Flusher, t.initFlusher)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(StoreQueryable, t.initStoreQueryable)
mm.RegisterModule(StoreQueryable, t.initStoreQueryables)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Ruler, t.initRuler)
Expand Down
43 changes: 32 additions & 11 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"sort"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)
Expand All @@ -29,17 +31,36 @@ type Distributor interface {
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
}

func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &distributorQuerier{
distributor: distributor,
ctx: ctx,
mint: mint,
maxt: maxt,
streaming: streaming,
chunkIterFn: iteratorFn,
}, nil
})
func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc, queryIngesterWithin time.Duration) QueryableWithFilter {
return distributorQueryable{
distributor: distributor,
streaming: streaming,
iteratorFn: iteratorFn,
queryIngesterWithin: queryIngesterWithin,
}
}

type distributorQueryable struct {
distributor Distributor
streaming bool
iteratorFn chunkIteratorFunc
queryIngesterWithin time.Duration
}

func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &distributorQuerier{
distributor: d.distributor,
ctx: ctx,
mint: mint,
maxt: maxt,
streaming: d.streaming,
chunkIterFn: d.iteratorFn,
}, nil
}

func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bool {
// Include ingester only if maxt is within QueryIngestersWithin w.r.t. current time.
return d.queryIngesterWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngesterWithin))
}

type distributorQuerier struct {
Expand Down
22 changes: 20 additions & 2 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
)

Expand All @@ -38,7 +40,7 @@ func TestDistributorQuerier(t *testing.T) {
},
},
}
queryable := newDistributorQueryable(d, false, nil)
queryable := newDistributorQueryable(d, false, nil, 0)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)

Expand All @@ -57,6 +59,22 @@ func TestDistributorQuerier(t *testing.T) {
require.NoError(t, seriesSet.Err())
}

func TestDistributorQueryableFilter(t *testing.T) {
d := &mockDistributor{}
dq := newDistributorQueryable(d, false, nil, 1*time.Hour)

now := time.Now()

queryMinT := util.TimeToMillis(now.Add(-5 * time.Minute))
queryMaxT := util.TimeToMillis(now)

require.True(t, dq.UseQueryable(now, queryMinT, queryMaxT))
require.True(t, dq.UseQueryable(now.Add(time.Hour), queryMinT, queryMaxT))

// Same query, hour+1ms later, is not sent to ingesters.
require.False(t, dq.UseQueryable(now.Add(time.Hour).Add(1*time.Millisecond), queryMinT, queryMaxT))
}

func TestIngesterStreaming(t *testing.T) {
// We need to make sure that there is atleast one chunk present,
// else no series will be selected.
Expand Down Expand Up @@ -87,7 +105,7 @@ func TestIngesterStreaming(t *testing.T) {
},
}
ctx := user.InjectOrgID(context.Background(), "0")
queryable := newDistributorQueryable(d, true, mergeChunks)
queryable := newDistributorQueryable(d, true, mergeChunks, 0)
querier, err := queryable.Querier(ctx, mint, maxt)
require.NoError(t, err)

Expand Down
Loading