Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (
type BlockQueryable struct {
services.Service

us *UserStore
us *BucketStoresService
}

// NewBlockQueryable returns a client to query a block store
func NewBlockQueryable(cfg tsdb.Config, logLevel logging.Level, registerer prometheus.Registerer) (*BlockQueryable, error) {
util.WarnExperimentalUse("Blocks storage engine")
bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", util.Logger)
bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-bucket-stores", util.Logger)
if err != nil {
return nil, err
}
Expand All @@ -42,7 +42,7 @@ func NewBlockQueryable(cfg tsdb.Config, logLevel logging.Level, registerer prome
bucketClient = objstore.BucketWithMetrics( /* bucket label value */ "", bucketClient, prometheus.WrapRegistererWithPrefix("cortex_querier_", registerer))
}

us, err := NewUserStore(cfg, bucketClient, logLevel, util.Logger, registerer)
us, err := NewBucketStoresService(cfg, bucketClient, logLevel, util.Logger, registerer)
if err != nil {
return nil, err
}
Expand All @@ -54,11 +54,11 @@ func NewBlockQueryable(cfg tsdb.Config, logLevel logging.Level, registerer prome
}

func (b *BlockQueryable) starting(ctx context.Context) error {
return errors.Wrap(services.StartAndAwaitRunning(ctx, b.us), "failed to start UserStore")
return errors.Wrap(services.StartAndAwaitRunning(ctx, b.us), "failed to start BucketStoresService")
}

func (b *BlockQueryable) stopping(_ error) error {
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), b.us), "stopping UserStore")
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), b.us), "stopping BucketStoresService")
}

// Querier returns a new Querier on the storage.
Expand All @@ -85,7 +85,7 @@ type blocksQuerier struct {
ctx context.Context
mint, maxt int64
userID string
userStores *UserStore
userStores *BucketStoresService
}

func (b *blocksQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
Expand Down
144 changes: 54 additions & 90 deletions pkg/querier/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package querier

import (
"context"
"io"
"fmt"
"path/filepath"
"strings"
"sync"
Expand All @@ -13,32 +13,29 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/logging"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

// UserStore is a multi-tenant version of Thanos BucketStore
type UserStore struct {
services.Service

// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
type BucketStores struct {
logger log.Logger
cfg tsdb.Config
bucket objstore.Bucket
logLevel logging.Level
bucketStoreMetrics *BucketStoreMetrics
metaFetcherMetrics *metaFetcherMetrics
indexCacheMetrics prometheus.Collector
filters []block.MetadataFilter

// Index cache shared across all tenants.
indexCache storecache.IndexCache
Expand All @@ -51,20 +48,21 @@ type UserStore struct {
syncTimes prometheus.Histogram
}

// NewUserStore returns a new UserStore
func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) {
// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.Config, filters []block.MetadataFilter, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
indexCacheRegistry := prometheus.NewRegistry()

u := &UserStore{
u := &BucketStores{
logger: logger,
cfg: cfg,
bucket: bucketClient,
filters: filters,
stores: map[string]*store.BucketStore{},
logLevel: logLevel,
bucketStoreMetrics: NewBucketStoreMetrics(),
metaFetcherMetrics: newMetaFetcherMetrics(),
indexCacheMetrics: tsdb.MustNewIndexCacheMetrics(cfg.BucketStore.IndexCache.Backend, indexCacheRegistry),
syncTimes: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_querier_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900},
Expand All @@ -77,30 +75,18 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
return nil, errors.Wrap(err, "create index cache")
}

if registerer != nil {
registerer.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics, u.indexCacheMetrics)
if reg != nil {
reg.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics, u.indexCacheMetrics)
}

u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, nil)
return u, nil
}

func (u *UserStore) starting(ctx context.Context) error {
if u.cfg.BucketStore.SyncInterval > 0 {
// Run an initial blocks sync, required in order to be able to serve queries.
if err := u.initialSync(ctx); err != nil {
return err
}
}

return nil
}

// initialSync iterates over the storage bucket creating user bucket stores, and calling initialSync on each of them
func (u *UserStore) initialSync(ctx context.Context) error {
// InitialSync does an initial synchronization of blocks for all users.
func (u *BucketStores) InitialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")

if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error {
if err := u.syncUsersBlocks(ctx, func(ctx context.Context, s *store.BucketStore) error {
return s.InitialSync(ctx)
}); err != nil {
level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
Expand All @@ -111,45 +97,9 @@ func (u *UserStore) initialSync(ctx context.Context) error {
return nil
}

// syncStoresLoop periodically calls syncStores() to synchronize the blocks for all tenants.
func (u *UserStore) syncStoresLoop(ctx context.Context) error {
// If the sync is disabled we never sync blocks, which means the bucket store
// will be empty and no series will be returned once queried.
if u.cfg.BucketStore.SyncInterval <= 0 {
<-ctx.Done()
return nil
}

syncInterval := u.cfg.BucketStore.SyncInterval

// Since we've just run the initial sync, we should wait the next
// sync interval before resynching.
select {
case <-ctx.Done():
return nil
case <-time.After(syncInterval):
}

err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
if err := u.syncStores(ctx); err != nil && err != io.EOF {
level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
} else {
level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users")
}

return nil
})

// This should never occur because the rununtil.Repeat() returns error
// only if the callback function returns error (which doesn't), but since
// we have to handle the error because of the linter, it's better to log it.
return errors.Wrap(err, "blocks synchronization has been halted due to an unexpected error")
}

// syncStores iterates over the storage bucket creating user bucket stores
func (u *UserStore) syncStores(ctx context.Context) error {
if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error {
// SyncBlocks synchronizes the stores state with the Bucket store for every user.
func (u *BucketStores) SyncBlocks(ctx context.Context) error {
if err := u.syncUsersBlocks(ctx, func(ctx context.Context, s *store.BucketStore) error {
return s.SyncBlocks(ctx)
}); err != nil {
return err
Expand All @@ -158,7 +108,7 @@ func (u *UserStore) syncStores(ctx context.Context) error {
return nil
}

func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
defer func(start time.Time) {
u.syncTimes.Observe(time.Since(start).Seconds())
}(time.Now())
Expand Down Expand Up @@ -197,12 +147,12 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
return err
}

jobs <- job{
userID: user,
store: bs,
select {
case jobs <- job{userID: user, store: bs}:
return nil
case <-ctx.Done():
return ctx.Err()
}

return nil
})

// Wait until all workers completed.
Expand All @@ -212,34 +162,33 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
return err
}

// Series makes a series request to the underlying user store.
func (u *UserStore) Series(ctx context.Context, userID string, req *storepb.SeriesRequest) ([]*storepb.Series, storage.Warnings, error) {
log, ctx := spanlogger.New(ctx, "UserStore.Series")
// Series makes a series request to the underlying user bucket store.
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
log, ctx := spanlogger.New(srv.Context(), "BucketStores.Series")
defer log.Span.Finish()

store := u.getStore(userID)
if store == nil {
return nil, nil, nil
userID := getUserIDFromGRPCContext(ctx)
if userID == "" {
return fmt.Errorf("no userID")
}

srv := newBucketStoreSeriesServer(ctx)
err := store.Series(req, srv)
if err != nil {
return nil, nil, err
store := u.getStore(userID)
if store == nil {
return nil
}

return srv.SeriesSet, srv.Warnings, nil
return store.Series(req, srv)
}

func (u *UserStore) getStore(userID string) *store.BucketStore {
func (u *BucketStores) getStore(userID string) *store.BucketStore {
u.storesMu.RLock()
store := u.stores[userID]
u.storesMu.RUnlock()

return store
}

func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) {
func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
// Check if the store already exists.
bs := u.getStore(userID)
if bs != nil {
Expand Down Expand Up @@ -268,14 +217,15 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
userBkt,
filepath.Join(u.cfg.BucketStore.SyncDir, userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
fetcherReg,
[]block.MetadataFilter{
// List of filters to apply (order matters).
// The input filters MUST be before the ones we create here (order matters).
append(u.filters, []block.MetadataFilter{
block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg),
block.NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay),
// Filters out duplicate blocks that can be formed from two or more overlapping
// blocks that fully submatches the source blocks of the older blocks.
// TODO(pracucci) can this cause troubles with the upcoming blocks sharding in the store-gateway?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will. I think deduplicate filter removes old blocks if new blocks cover it completely, but we better verify it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may have troubles with the consistency check, at least as I did design it. I will write down different scenarios in tests and let's see (once we'll reach the point to build the consistency check).

block.NewDeduplicateFilter(),
},
}...),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -309,3 +259,17 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)

return bs, nil
}

func getUserIDFromGRPCContext(ctx context.Context) string {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}

values := meta.Get(tsdb.TenantIDExternalLabel)
if len(values) != 1 {
return ""
}

return values[0]
}
Loading