Skip to content

Commit 50f53db

Browse files
authored
Decoupled BucketStores from UserStore (#2421)
* De-coupled BucketStores from UserStore Signed-off-by: Marco Pracucci <[email protected]> * Handle context done in syncUsersBlocks() Signed-off-by: Marco Pracucci <[email protected]> * Renamed UserStore to BucketStoresService Signed-off-by: Marco Pracucci <[email protected]>
1 parent cc68575 commit 50f53db

File tree

6 files changed

+464
-173
lines changed

6 files changed

+464
-173
lines changed

pkg/querier/block.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ import (
2727
type BlockQueryable struct {
2828
services.Service
2929

30-
us *UserStore
30+
us *BucketStoresService
3131
}
3232

3333
// NewBlockQueryable returns a client to query a block store
3434
func NewBlockQueryable(cfg tsdb.Config, logLevel logging.Level, registerer prometheus.Registerer) (*BlockQueryable, error) {
3535
util.WarnExperimentalUse("Blocks storage engine")
36-
bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", util.Logger)
36+
bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-bucket-stores", util.Logger)
3737
if err != nil {
3838
return nil, err
3939
}
@@ -42,7 +42,7 @@ func NewBlockQueryable(cfg tsdb.Config, logLevel logging.Level, registerer prome
4242
bucketClient = objstore.BucketWithMetrics( /* bucket label value */ "", bucketClient, prometheus.WrapRegistererWithPrefix("cortex_querier_", registerer))
4343
}
4444

45-
us, err := NewUserStore(cfg, bucketClient, logLevel, util.Logger, registerer)
45+
us, err := NewBucketStoresService(cfg, bucketClient, logLevel, util.Logger, registerer)
4646
if err != nil {
4747
return nil, err
4848
}
@@ -54,11 +54,11 @@ func NewBlockQueryable(cfg tsdb.Config, logLevel logging.Level, registerer prome
5454
}
5555

5656
func (b *BlockQueryable) starting(ctx context.Context) error {
57-
return errors.Wrap(services.StartAndAwaitRunning(ctx, b.us), "failed to start UserStore")
57+
return errors.Wrap(services.StartAndAwaitRunning(ctx, b.us), "failed to start BucketStoresService")
5858
}
5959

6060
func (b *BlockQueryable) stopping(_ error) error {
61-
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), b.us), "stopping UserStore")
61+
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), b.us), "stopping BucketStoresService")
6262
}
6363

6464
// Querier returns a new Querier on the storage.
@@ -85,7 +85,7 @@ type blocksQuerier struct {
8585
ctx context.Context
8686
mint, maxt int64
8787
userID string
88-
userStores *UserStore
88+
userStores *BucketStoresService
8989
}
9090

9191
func (b *blocksQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {

pkg/querier/block_store.go

Lines changed: 54 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package querier
22

33
import (
44
"context"
5-
"io"
5+
"fmt"
66
"path/filepath"
77
"strings"
88
"sync"
@@ -13,32 +13,29 @@ import (
1313
"github.com/pkg/errors"
1414
"github.com/prometheus/client_golang/prometheus"
1515
"github.com/prometheus/client_golang/prometheus/promauto"
16-
"github.com/prometheus/prometheus/storage"
1716
"github.com/thanos-io/thanos/pkg/block"
1817
"github.com/thanos-io/thanos/pkg/objstore"
19-
"github.com/thanos-io/thanos/pkg/runutil"
2018
"github.com/thanos-io/thanos/pkg/store"
2119
storecache "github.com/thanos-io/thanos/pkg/store/cache"
2220
"github.com/thanos-io/thanos/pkg/store/storepb"
2321
"github.com/weaveworks/common/logging"
22+
"google.golang.org/grpc/metadata"
2423

2524
"github.com/cortexproject/cortex/pkg/storage/tsdb"
2625
"github.com/cortexproject/cortex/pkg/util"
27-
"github.com/cortexproject/cortex/pkg/util/services"
2826
"github.com/cortexproject/cortex/pkg/util/spanlogger"
2927
)
3028

31-
// UserStore is a multi-tenant version of Thanos BucketStore
32-
type UserStore struct {
33-
services.Service
34-
29+
// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
30+
type BucketStores struct {
3531
logger log.Logger
3632
cfg tsdb.Config
3733
bucket objstore.Bucket
3834
logLevel logging.Level
3935
bucketStoreMetrics *BucketStoreMetrics
4036
metaFetcherMetrics *metaFetcherMetrics
4137
indexCacheMetrics prometheus.Collector
38+
filters []block.MetadataFilter
4239

4340
// Index cache shared across all tenants.
4441
indexCache storecache.IndexCache
@@ -51,20 +48,21 @@ type UserStore struct {
5148
syncTimes prometheus.Histogram
5249
}
5350

54-
// NewUserStore returns a new UserStore
55-
func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) {
51+
// NewBucketStores makes a new BucketStores.
52+
func NewBucketStores(cfg tsdb.Config, filters []block.MetadataFilter, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
5653
indexCacheRegistry := prometheus.NewRegistry()
5754

58-
u := &UserStore{
55+
u := &BucketStores{
5956
logger: logger,
6057
cfg: cfg,
6158
bucket: bucketClient,
59+
filters: filters,
6260
stores: map[string]*store.BucketStore{},
6361
logLevel: logLevel,
6462
bucketStoreMetrics: NewBucketStoreMetrics(),
6563
metaFetcherMetrics: newMetaFetcherMetrics(),
6664
indexCacheMetrics: tsdb.MustNewIndexCacheMetrics(cfg.BucketStore.IndexCache.Backend, indexCacheRegistry),
67-
syncTimes: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
65+
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
6866
Name: "cortex_querier_blocks_sync_seconds",
6967
Help: "The total time it takes to perform a sync stores",
7068
Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900},
@@ -77,30 +75,18 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
7775
return nil, errors.Wrap(err, "create index cache")
7876
}
7977

80-
if registerer != nil {
81-
registerer.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics, u.indexCacheMetrics)
78+
if reg != nil {
79+
reg.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics, u.indexCacheMetrics)
8280
}
8381

84-
u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, nil)
8582
return u, nil
8683
}
8784

88-
func (u *UserStore) starting(ctx context.Context) error {
89-
if u.cfg.BucketStore.SyncInterval > 0 {
90-
// Run an initial blocks sync, required in order to be able to serve queries.
91-
if err := u.initialSync(ctx); err != nil {
92-
return err
93-
}
94-
}
95-
96-
return nil
97-
}
98-
99-
// initialSync iterates over the storage bucket creating user bucket stores, and calling initialSync on each of them
100-
func (u *UserStore) initialSync(ctx context.Context) error {
85+
// InitialSync does an initial synchronization of blocks for all users.
86+
func (u *BucketStores) InitialSync(ctx context.Context) error {
10187
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
10288

103-
if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error {
89+
if err := u.syncUsersBlocks(ctx, func(ctx context.Context, s *store.BucketStore) error {
10490
return s.InitialSync(ctx)
10591
}); err != nil {
10692
level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
@@ -111,45 +97,9 @@ func (u *UserStore) initialSync(ctx context.Context) error {
11197
return nil
11298
}
11399

114-
// syncStoresLoop periodically calls syncStores() to synchronize the blocks for all tenants.
115-
func (u *UserStore) syncStoresLoop(ctx context.Context) error {
116-
// If the sync is disabled we never sync blocks, which means the bucket store
117-
// will be empty and no series will be returned once queried.
118-
if u.cfg.BucketStore.SyncInterval <= 0 {
119-
<-ctx.Done()
120-
return nil
121-
}
122-
123-
syncInterval := u.cfg.BucketStore.SyncInterval
124-
125-
// Since we've just run the initial sync, we should wait the next
126-
// sync interval before resynching.
127-
select {
128-
case <-ctx.Done():
129-
return nil
130-
case <-time.After(syncInterval):
131-
}
132-
133-
err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
134-
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
135-
if err := u.syncStores(ctx); err != nil && err != io.EOF {
136-
level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
137-
} else {
138-
level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users")
139-
}
140-
141-
return nil
142-
})
143-
144-
// This should never occur because the rununtil.Repeat() returns error
145-
// only if the callback function returns error (which doesn't), but since
146-
// we have to handle the error because of the linter, it's better to log it.
147-
return errors.Wrap(err, "blocks synchronization has been halted due to an unexpected error")
148-
}
149-
150-
// syncStores iterates over the storage bucket creating user bucket stores
151-
func (u *UserStore) syncStores(ctx context.Context) error {
152-
if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error {
100+
// SyncBlocks synchronizes the stores state with the Bucket store for every user.
101+
func (u *BucketStores) SyncBlocks(ctx context.Context) error {
102+
if err := u.syncUsersBlocks(ctx, func(ctx context.Context, s *store.BucketStore) error {
153103
return s.SyncBlocks(ctx)
154104
}); err != nil {
155105
return err
@@ -158,7 +108,7 @@ func (u *UserStore) syncStores(ctx context.Context) error {
158108
return nil
159109
}
160110

161-
func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
111+
func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
162112
defer func(start time.Time) {
163113
u.syncTimes.Observe(time.Since(start).Seconds())
164114
}(time.Now())
@@ -197,12 +147,12 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
197147
return err
198148
}
199149

200-
jobs <- job{
201-
userID: user,
202-
store: bs,
150+
select {
151+
case jobs <- job{userID: user, store: bs}:
152+
return nil
153+
case <-ctx.Done():
154+
return ctx.Err()
203155
}
204-
205-
return nil
206156
})
207157

208158
// Wait until all workers completed.
@@ -212,34 +162,33 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
212162
return err
213163
}
214164

215-
// Series makes a series request to the underlying user store.
216-
func (u *UserStore) Series(ctx context.Context, userID string, req *storepb.SeriesRequest) ([]*storepb.Series, storage.Warnings, error) {
217-
log, ctx := spanlogger.New(ctx, "UserStore.Series")
165+
// Series makes a series request to the underlying user bucket store.
166+
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
167+
log, ctx := spanlogger.New(srv.Context(), "BucketStores.Series")
218168
defer log.Span.Finish()
219169

220-
store := u.getStore(userID)
221-
if store == nil {
222-
return nil, nil, nil
170+
userID := getUserIDFromGRPCContext(ctx)
171+
if userID == "" {
172+
return fmt.Errorf("no userID")
223173
}
224174

225-
srv := newBucketStoreSeriesServer(ctx)
226-
err := store.Series(req, srv)
227-
if err != nil {
228-
return nil, nil, err
175+
store := u.getStore(userID)
176+
if store == nil {
177+
return nil
229178
}
230179

231-
return srv.SeriesSet, srv.Warnings, nil
180+
return store.Series(req, srv)
232181
}
233182

234-
func (u *UserStore) getStore(userID string) *store.BucketStore {
183+
func (u *BucketStores) getStore(userID string) *store.BucketStore {
235184
u.storesMu.RLock()
236185
store := u.stores[userID]
237186
u.storesMu.RUnlock()
238187

239188
return store
240189
}
241190

242-
func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) {
191+
func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
243192
// Check if the store already exists.
244193
bs := u.getStore(userID)
245194
if bs != nil {
@@ -268,14 +217,15 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
268217
userBkt,
269218
filepath.Join(u.cfg.BucketStore.SyncDir, userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
270219
fetcherReg,
271-
[]block.MetadataFilter{
272-
// List of filters to apply (order matters).
220+
// The input filters MUST be before the ones we create here (order matters).
221+
append(u.filters, []block.MetadataFilter{
273222
block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg),
274223
block.NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay),
275224
// Filters out duplicate blocks that can be formed from two or more overlapping
276225
// blocks that fully submatches the source blocks of the older blocks.
226+
// TODO(pracucci) can this cause troubles with the upcoming blocks sharding in the store-gateway?
277227
block.NewDeduplicateFilter(),
278-
},
228+
}...),
279229
)
280230
if err != nil {
281231
return nil, err
@@ -309,3 +259,17 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
309259

310260
return bs, nil
311261
}
262+
263+
func getUserIDFromGRPCContext(ctx context.Context) string {
264+
meta, ok := metadata.FromIncomingContext(ctx)
265+
if !ok {
266+
return ""
267+
}
268+
269+
values := meta.Get(tsdb.TenantIDExternalLabel)
270+
if len(values) != 1 {
271+
return ""
272+
}
273+
274+
return values[0]
275+
}

0 commit comments

Comments
 (0)