Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,16 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -693,6 +701,7 @@ type compactConfig struct {
wait bool
waitInterval time.Duration
disableDownsampling bool
blockListStrategy string
blockMetaFetchConcurrency int
blockFilesConcurrency int
blockViewerSyncBlockInterval time.Duration
Expand Down Expand Up @@ -754,6 +763,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway").
Default("false").BoolVar(&cc.disableDownsampling)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of"+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RunDownsample(
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down Expand Up @@ -197,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down
25 changes: 23 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -56,6 +57,13 @@ const (
retryIntervalDuration = 10
)

type syncStrategy string

const (
concurrentDiscovery syncStrategy = "concurrent"
recursiveDiscovery syncStrategy = "recursive"
)

type storeConfig struct {
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
Expand All @@ -74,6 +82,7 @@ type storeConfig struct {
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
Expand Down Expand Up @@ -137,6 +146,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("15m").DurationVar(&sc.syncInterval)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of"+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"One of "+strategies
instead of
"One of"+strategies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy)

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1.").
Default("20").IntVar(&sc.blockSyncConcurrency)

Expand Down Expand Up @@ -345,9 +358,17 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -423,7 +423,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -525,7 +525,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
Expand Down Expand Up @@ -669,7 +669,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
return err
}
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
Expand Down Expand Up @@ -848,7 +848,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down Expand Up @@ -1391,7 +1391,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
117 changes: 93 additions & 24 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,26 +170,27 @@ func DefaultModifiedLabelValues() [][]string {
}
}

// Fetcher interface to retieve blockId information from a bucket.
type BlockIDsFetcher interface {
// GetActiveBlocksIDs returning it via channel (streaming) and response.
// Lister lists block IDs from a bucket.
type Lister interface {
// GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response.
// Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json
GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error)
}

type BaseBlockIDsFetcher struct {
// RecursiveLister lists block IDs by recursively iterating through a bucket.
type RecursiveLister struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher {
return &BaseBlockIDsFetcher{
func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveLister {
return &RecursiveLister{
logger: logger,
bkt: bkt,
}
}

func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
partialBlocks = make(map[ulid.ULID]bool)
err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
Expand All @@ -216,6 +217,74 @@ func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, c
return partialBlocks, err
}

// ConcurrentLister lists block IDs by doing a top level iteration of the bucket
// followed by one Exists call for each discovered block to detect partial blocks.
type ConcurrentLister struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewConcurrentLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *ConcurrentLister {
return &ConcurrentLister{
logger: logger,
bkt: bkt,
}
}

func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
const concurrency = 64

partialBlocks = make(map[ulid.ULID]bool)
var (
metaChan = make(chan ulid.ULID, concurrency)
eg, gCtx = errgroup.WithContext(ctx)
mu sync.Mutex
)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for uid := range metaChan {
// TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items.
// For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix.
// TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work).
metaFile := path.Join(uid.String(), MetaFilename)
ok, err := f.bkt.Exists(gCtx, metaFile)
if err != nil {
return errors.Wrapf(err, "meta.json file exists: %v", uid)
}
if !ok {
mu.Lock()
partialBlocks[uid] = true
mu.Unlock()
continue
}
ch <- uid
}
return nil
})
}

if err = f.bkt.Iter(ctx, "", func(name string) error {
id, ok := IsBlockDir(name)
if !ok {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case metaChan <- id:
}
return nil
}); err != nil {
return nil, err
}
close(metaChan)

if err := eg.Wait(); err != nil {
return nil, err
}
return partialBlocks, nil
}

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
UpdateOnChange(func([]metadata.Meta, error))
Expand All @@ -234,10 +303,10 @@ type MetadataFilter interface {
// BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Go-routine safe.
type BaseFetcher struct {
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsFetcher BlockIDsFetcher
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsLister Lister

// Optional local directory to cache meta.json files.
cacheDir string
Expand All @@ -249,12 +318,12 @@ type BaseFetcher struct {
}

// NewBaseFetcher constructs BaseFetcher.
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
return NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, NewBaseFetcherMetrics(reg))
}

// NewBaseFetcherWithMetrics constructs BaseFetcher.
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister Lister, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -268,24 +337,24 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.
}

return &BaseFetcher{
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsFetcher: blockIDsFetcher,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsLister: blockIDsLister,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
}, nil
}

// NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads.
// NOTE: Not suitable to use in production.
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher) (*MetaFetcher, error) {
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister) (*MetaFetcher, error) {
return NewMetaFetcher(logger, 1, bkt, blockIDsFetcher, "", nil, nil)
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, blockIDsFetcher, dir, reg)
if err != nil {
return nil, err
Expand All @@ -294,7 +363,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente
}

// NewMetaFetcherWithMetrics returns meta fetcher.
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, baseFetcherMetrics)
if err != nil {
return nil, err
Expand Down Expand Up @@ -445,7 +514,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
partialBlocks, err = f.blockIDsLister.GetActiveAndPartialBlockIDs(ctx, ch)
return err
})

Expand Down
3 changes: 2 additions & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/objstore/objtesting"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
r := prometheus.NewRegistry()
noopLogger := log.NewNopLogger()
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := NewBaseBlockIDsFetcher(noopLogger, insBkt)
baseBlockIDsFetcher := NewConcurrentLister(noopLogger, insBkt)
baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r)
testutil.Ok(t, err)

Expand Down
Loading