Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ blocks_storage:
# CLI flag: -blocks-storage.filesystem.dir
[dir: <string> | default = ""]

# This configures how the store-gateway synchronizes blocks stored in the
# bucket.
# This configures how the querier and store-gateway discover and synchronize
# blocks stored in the bucket.
bucket_store:
# Directory to store synchronized TSDB index headers.
# CLI flag: -blocks-storage.bucket-store.sync-dir
Expand Down Expand Up @@ -587,6 +587,14 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-attributes-ttl
[metafile_attributes_ttl: <duration> | default = 168h]

# How long to cache content of the metafile.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl
[bucket_index_content_ttl: <duration> | default = 5m]

# Maximum size of bucket index content to cache in bytes.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to
# ignore blocks that are marked for deletion with some delay. This ensures
Expand All @@ -596,6 +604,33 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: <duration> | default = 6h]

bucket_index:
# True to enable querier to discover blocks in the storage via bucket
# index instead of bucket scanning.
# CLI flag: -blocks-storage.bucket-store.bucket-index.enabled
[enabled: <boolean> | default = false]

# How frequently a cached bucket index should be refreshed.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval
[update_on_stale_interval: <duration> | default = 15m]

# How frequently a bucket index, which previously failed to load, should
# be tried to load again.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
[update_on_error_interval: <duration> | default = 1m]

# How long a unused bucket index should be cached. Once this timeout
# expires, the unused bucket index is removed from the in-memory cache.
# CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
[idle_timeout: <duration> | default = 1h]

# The maximum allowed age of a bucket index (last updated) before queries
# start failing because the bucket index is too old. The bucket index is
# periodically updated by the compactor, while this check is enforced in
# the querier (at query time).
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
39 changes: 37 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ blocks_storage:
# CLI flag: -blocks-storage.filesystem.dir
[dir: <string> | default = ""]

# This configures how the store-gateway synchronizes blocks stored in the
# bucket.
# This configures how the querier and store-gateway discover and synchronize
# blocks stored in the bucket.
bucket_store:
# Directory to store synchronized TSDB index headers.
# CLI flag: -blocks-storage.bucket-store.sync-dir
Expand Down Expand Up @@ -637,6 +637,14 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-attributes-ttl
[metafile_attributes_ttl: <duration> | default = 168h]

# How long to cache content of the metafile.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl
[bucket_index_content_ttl: <duration> | default = 5m]

# Maximum size of bucket index content to cache in bytes.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to
# ignore blocks that are marked for deletion with some delay. This ensures
Expand All @@ -646,6 +654,33 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: <duration> | default = 6h]

bucket_index:
# True to enable querier to discover blocks in the storage via bucket
# index instead of bucket scanning.
# CLI flag: -blocks-storage.bucket-store.bucket-index.enabled
[enabled: <boolean> | default = false]

# How frequently a cached bucket index should be refreshed.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval
[update_on_stale_interval: <duration> | default = 15m]

# How frequently a bucket index, which previously failed to load, should
# be tried to load again.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
[update_on_error_interval: <duration> | default = 1m]

# How long a unused bucket index should be cached. Once this timeout
# expires, the unused bucket index is removed from the in-memory cache.
# CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
[idle_timeout: <duration> | default = 1h]

# The maximum allowed age of a bucket index (last updated) before queries
# start failing because the bucket index is too old. The bucket index is
# periodically updated by the compactor, while this check is enforced in
# the querier (at query time).
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
39 changes: 37 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3621,8 +3621,8 @@ filesystem:
# CLI flag: -blocks-storage.filesystem.dir
[dir: <string> | default = ""]

# This configures how the store-gateway synchronizes blocks stored in the
# bucket.
# This configures how the querier and store-gateway discover and synchronize
# blocks stored in the bucket.
bucket_store:
# Directory to store synchronized TSDB index headers.
# CLI flag: -blocks-storage.bucket-store.sync-dir
Expand Down Expand Up @@ -3866,6 +3866,14 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-attributes-ttl
[metafile_attributes_ttl: <duration> | default = 168h]

# How long to cache content of the metafile.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl
[bucket_index_content_ttl: <duration> | default = 5m]

# Maximum size of bucket index content to cache in bytes.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to ignore
# blocks that are marked for deletion with some delay. This ensures store can
Expand All @@ -3875,6 +3883,33 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: <duration> | default = 6h]

bucket_index:
# True to enable querier to discover blocks in the storage via bucket index
# instead of bucket scanning.
# CLI flag: -blocks-storage.bucket-store.bucket-index.enabled
[enabled: <boolean> | default = false]

# How frequently a cached bucket index should be refreshed.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval
[update_on_stale_interval: <duration> | default = 15m]

# How frequently a bucket index, which previously failed to load, should be
# tried to load again.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
[update_on_error_interval: <duration> | default = 1m]

# How long a unused bucket index should be cached. Once this timeout
# expires, the unused bucket index is removed from the in-memory cache.
# CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
[idle_timeout: <duration> | default = 1h]

# The maximum allowed age of a bucket index (last updated) before queries
# start failing because the bucket index is too old. The bucket index is
# periodically updated by the compactor, while this check is enforced in the
# querier (at query time).
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
144 changes: 144 additions & 0 deletions pkg/querier/blocks_finder_bucket_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package querier

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/services"
)

var (
errBucketIndexBlocksFinderNotRunning = errors.New("bucket index blocks finder is not running")
errBucketIndexTooOld = errors.New("bucket index is too old and the last time it was updated exceeds the allowed max staleness")
)

type BucketIndexBlocksFinderConfig struct {
IndexLoader bucketindex.LoaderConfig
MaxStalePeriod time.Duration
IgnoreDeletionMarksDelay time.Duration
}

// BucketIndexBlocksFinder implements BlocksFinder interface and find blocks in the bucket
// looking up the bucket index.
type BucketIndexBlocksFinder struct {
services.Service

cfg BucketIndexBlocksFinderConfig
loader *bucketindex.Loader

// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}

func NewBucketIndexBlocksFinder(cfg BucketIndexBlocksFinderConfig, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (*BucketIndexBlocksFinder, error) {
f := &BucketIndexBlocksFinder{
cfg: cfg,
loader: bucketindex.NewLoader(cfg.IndexLoader, bkt, logger, reg),
}

var err error
f.subservices, err = services.NewManager(f.loader)
if err != nil {
return nil, err
}

f.Service = services.NewBasicService(f.starting, f.running, f.stopping)

return f, nil
}

func (f *BucketIndexBlocksFinder) starting(ctx context.Context) error {
f.subservicesWatcher.WatchManager(f.subservices)

if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil {
return errors.Wrap(err, "unable to start blocks index querier subservices")
}

return nil
}

func (f *BucketIndexBlocksFinder) running(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case err := <-f.subservicesWatcher.Chan():
return errors.Wrap(err, "blocks undex querier set subservice failed")
}
}
}

func (f *BucketIndexBlocksFinder) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
}

// GetBlocks implements BlocksFinder.
func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
if f.State() != services.Running {
return nil, nil, errBucketIndexBlocksFinderNotRunning
}
if maxT < minT {
return nil, nil, errInvalidBlocksRange
}

// Get the bucket index for this user.
idx, err := f.loader.GetIndex(ctx, userID)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
// This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet
// so the bucket index hasn't been created yet.
return nil, nil, nil
}
if err != nil {
return nil, nil, err
}

// Ensure the bucket index is not too old.
if time.Since(idx.GetUpdatedAt()) > f.cfg.MaxStalePeriod {
return nil, nil, errBucketIndexTooOld
}

var (
matchingBlocks = map[ulid.ULID]*bucketindex.Block{}
matchingDeletionMarks = map[ulid.ULID]*bucketindex.BlockDeletionMark{}
)

// Filter blocks containing samples within the range.
for _, block := range idx.Blocks {
if !block.Within(minT, maxT) {
continue
}

matchingBlocks[block.ID] = block
}

for _, mark := range idx.BlockDeletionMarks {
// Filter deletion marks by matching blocks only.
if _, ok := matchingBlocks[mark.ID]; !ok {
continue
}

// Exclude blocks marked for deletion. This is the same logic as Thanos IgnoreDeletionMarkFilter.
if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.cfg.IgnoreDeletionMarksDelay.Seconds() {
delete(matchingBlocks, mark.ID)
continue
}

matchingDeletionMarks[mark.ID] = mark
}

// Convert matching blocks into a list.
blocks := make(bucketindex.Blocks, 0, len(matchingBlocks))
for _, b := range matchingBlocks {
blocks = append(blocks, b)
}

return blocks, matchingDeletionMarks, nil
}
Loading