From d5168cfb4d712c79428e4b5de1c72c271fd0357a Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 13 Feb 2024 09:33:39 +0100 Subject: [PATCH 1/5] Allow using different listing strategies Signed-off-by: Filip Petkovski --- cmd/thanos/main_test.go | 1 + pkg/block/fetcher.go | 106 ++++++++++++++++++++++++++------ pkg/block/fetcher_test.go | 1 + pkg/compact/clean_test.go | 1 + pkg/compact/compact_e2e_test.go | 1 + pkg/compact/retention_test.go | 1 + pkg/store/acceptance_test.go | 1 + 7 files changed, 93 insertions(+), 19 deletions(-) diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index d8d1fffef00..c36183cebcc 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 828503e91b3..72fec86ff3e 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -170,26 +170,27 @@ func DefaultModifiedLabelValues() [][]string { } } -// Fetcher interface to retieve blockId information from a bucket. +// BlockIDsFetcher lists block IDs from a bucket. type BlockIDsFetcher interface { - // GetActiveBlocksIDs returning it via channel (streaming) and response. + // GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response. // Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) } -type BaseBlockIDsFetcher struct { +// RecursiveBlockIDsFetcher lists block IDs by iterating the object storage bucket recursively. +type RecursiveBlockIDsFetcher struct { logger log.Logger bkt objstore.InstrumentedBucketReader } -func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher { - return &BaseBlockIDsFetcher{ +func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveBlockIDsFetcher { + return &RecursiveBlockIDsFetcher{ logger: logger, bkt: bkt, } } -func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *RecursiveBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { partialBlocks = make(map[ulid.ULID]bool) err = f.bkt.Iter(ctx, "", func(name string) error { parts := strings.Split(name, "/") @@ -216,6 +217,73 @@ func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, c return partialBlocks, err } +// BaseBlockIDsFetcher Default lists block IDs by doing a top level iteration of the bucket and using an Exists call to detect partial blocks. +type BaseBlockIDsFetcher struct { + logger log.Logger + bkt objstore.InstrumentedBucketReader +} + +func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher { + return &BaseBlockIDsFetcher{ + logger: logger, + bkt: bkt, + } +} + +func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { + const concurrency = 64 + + partialBlocks = make(map[ulid.ULID]bool) + var ( + metaChan = make(chan ulid.ULID, concurrency) + eg, gCtx = errgroup.WithContext(ctx) + mu sync.Mutex + ) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for uid := range metaChan { + // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. + // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. + // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). + metaFile := path.Join(uid.String(), MetaFilename) + ok, err := f.bkt.Exists(gCtx, metaFile) + if err != nil { + return errors.Wrapf(err, "meta.json file exists: %v", uid) + } + if !ok { + mu.Lock() + partialBlocks[uid] = true + mu.Unlock() + continue + } + ch <- uid + } + return nil + }) + } + + if err = f.bkt.Iter(ctx, "", func(name string) error { + id, ok := IsBlockDir(name) + if !ok { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case metaChan <- id: + } + return nil + }); err != nil { + return nil, err + } + close(metaChan) + + if err := eg.Wait(); err != nil { + return nil, err + } + return partialBlocks, nil +} + type MetadataFetcher interface { Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) UpdateOnChange(func([]metadata.Meta, error)) @@ -234,10 +302,10 @@ type MetadataFilter interface { // BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. // Go-routine safe. type BaseFetcher struct { - logger log.Logger - concurrency int - bkt objstore.InstrumentedBucketReader - blockIDsFetcher BlockIDsFetcher + logger log.Logger + concurrency int + bkt objstore.InstrumentedBucketReader + blockIDsLister BlockIDsFetcher // Optional local directory to cache meta.json files. cacheDir string @@ -254,7 +322,7 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente } // NewBaseFetcherWithMetrics constructs BaseFetcher. -func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { +func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -268,13 +336,13 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore. } return &BaseFetcher{ - logger: log.With(logger, "component", "block.BaseFetcher"), - concurrency: concurrency, - bkt: bkt, - blockIDsFetcher: blockIDsFetcher, - cacheDir: cacheDir, - cached: map[ulid.ULID]*metadata.Meta{}, - syncs: metrics.Syncs, + logger: log.With(logger, "component", "block.BaseFetcher"), + concurrency: concurrency, + bkt: bkt, + blockIDsLister: blockIDsLister, + cacheDir: cacheDir, + cached: map[ulid.ULID]*metadata.Meta{}, + syncs: metrics.Syncs, }, nil } @@ -445,7 +513,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { // Workers scheduled, distribute blocks. eg.Go(func() error { defer close(ch) - partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + partialBlocks, err = f.blockIDsLister.GetActiveAndPartialBlockIDs(ctx, ch) return err }) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 11384e88fac..130c9830dc0 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index b66ca9f018a..ea3d1fa79d3 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -19,6 +19,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index f1e01ec4f49..13e0cae6f61 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/dedup" diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index c1936f095a9..b7b4464b364 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index be1a1179f1f..bcd164047d1 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" From a972f17cc763a99779deea08fdd1a5060436b28c Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 15 Feb 2024 06:57:07 +0100 Subject: [PATCH 2/5] Expose flags for block list strategy Signed-off-by: Filip Petkovski --- cmd/thanos/compact.go | 16 ++++++++++++-- cmd/thanos/downsample.go | 2 +- cmd/thanos/main_test.go | 4 ++-- cmd/thanos/store.go | 25 ++++++++++++++++++++-- cmd/thanos/tools_bucket.go | 12 +++++------ pkg/block/fetcher.go | 37 +++++++++++++++++---------------- pkg/block/fetcher_test.go | 2 +- pkg/compact/clean_test.go | 2 +- pkg/compact/compact_e2e_test.go | 6 +++--- pkg/compact/retention_test.go | 2 +- pkg/replicate/replicator.go | 2 +- pkg/store/acceptance_test.go | 2 +- pkg/store/bucket_e2e_test.go | 2 +- pkg/store/bucket_test.go | 18 ++++++++-------- 14 files changed, 83 insertions(+), 49 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ee8158de1fc..4d7370e936f 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -239,8 +239,16 @@ func runCompact( consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) - baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) + var blockLister block.Lister + switch syncStrategy(conf.blockListStrategy) { + case concurrentDiscovery: + blockLister = block.NewConcurrentLister(logger, insBkt) + case recursiveDiscovery: + blockLister = block.NewRecursiveLister(logger, insBkt) + default: + return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) + } + baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } @@ -693,6 +701,7 @@ type compactConfig struct { wait bool waitInterval time.Duration disableDownsampling bool + blockListStrategy string blockMetaFetchConcurrency int blockFilesConcurrency int blockViewerSyncBlockInterval time.Duration @@ -754,6 +763,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway"). Default("false").BoolVar(&cc.disableDownsampling) + strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") + cmd.Flag("block-discovery-strategy", "One of"+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage."). diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 6045bb29c11..3d6b1292617 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -90,7 +90,7 @@ func RunDownsample( insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) // While fetching blocks, filter out blocks that were marked for no downsample. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency), diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index c36183cebcc..1ced04637ba 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -158,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey()))) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) @@ -198,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey()))) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9ad3960ff5b..03e75b1783b 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/alecthomas/units" @@ -56,6 +57,13 @@ const ( retryIntervalDuration = 10 ) +type syncStrategy string + +const ( + concurrentDiscovery syncStrategy = "concurrent" + recursiveDiscovery syncStrategy = "recursive" +) + type storeConfig struct { indexCacheConfigs extflag.PathOrContent objStoreConfig extflag.PathOrContent @@ -74,6 +82,7 @@ type storeConfig struct { component component.StoreAPI debugLogging bool syncInterval time.Duration + blockListStrategy string blockSyncConcurrency int blockMetaFetchConcurrency int filterConf *store.FilterConfig @@ -137,6 +146,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("15m").DurationVar(&sc.syncInterval) + strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") + cmd.Flag("block-discovery-strategy", "One of"+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy) + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) @@ -345,9 +358,17 @@ func runStore( return errors.Wrap(err, "create index cache") } + var blockLister block.Lister + switch syncStrategy(conf.blockListStrategy) { + case concurrentDiscovery: + blockLister = block.NewConcurrentLister(logger, insBkt) + case recursiveDiscovery: + blockLister = block.NewRecursiveLister(logger, insBkt) + default: + return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) + } ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) - metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index a266cb381f2..326e4b09ebf 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -365,7 +365,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path // We ignore any block that has the deletion marker file. filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)} - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { return err @@ -423,7 +423,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency) filters = append(filters, ignoreDeletionMarkFilter) } - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { return err @@ -525,7 +525,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat } insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err @@ -669,7 +669,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC return err } // TODO(bwplotka): Allow Bucket UI to visualize the state of block as well. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), @@ -848,7 +848,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat var sy *compact.Syncer { - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") @@ -1391,7 +1391,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P var sy *compact.Syncer { - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 72fec86ff3e..4911b4748b7 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -170,27 +170,27 @@ func DefaultModifiedLabelValues() [][]string { } } -// BlockIDsFetcher lists block IDs from a bucket. -type BlockIDsFetcher interface { +// Lister lists block IDs from a bucket. +type Lister interface { // GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response. // Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) } -// RecursiveBlockIDsFetcher lists block IDs by iterating the object storage bucket recursively. -type RecursiveBlockIDsFetcher struct { +// RecursiveLister lists block IDs by recursively iterating through a bucket. +type RecursiveLister struct { logger log.Logger bkt objstore.InstrumentedBucketReader } -func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveBlockIDsFetcher { - return &RecursiveBlockIDsFetcher{ +func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveLister { + return &RecursiveLister{ logger: logger, bkt: bkt, } } -func (f *RecursiveBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { partialBlocks = make(map[ulid.ULID]bool) err = f.bkt.Iter(ctx, "", func(name string) error { parts := strings.Split(name, "/") @@ -217,20 +217,21 @@ func (f *RecursiveBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Conte return partialBlocks, err } -// BaseBlockIDsFetcher Default lists block IDs by doing a top level iteration of the bucket and using an Exists call to detect partial blocks. -type BaseBlockIDsFetcher struct { +// ConcurrentLister lists block IDs by doing a top level iteration of the bucket +// followed by one Exists call for each discovered block to detect partial blocks. +type ConcurrentLister struct { logger log.Logger bkt objstore.InstrumentedBucketReader } -func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher { - return &BaseBlockIDsFetcher{ +func NewConcurrentLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *ConcurrentLister { + return &ConcurrentLister{ logger: logger, bkt: bkt, } } -func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { const concurrency = 64 partialBlocks = make(map[ulid.ULID]bool) @@ -305,7 +306,7 @@ type BaseFetcher struct { logger log.Logger concurrency int bkt objstore.InstrumentedBucketReader - blockIDsLister BlockIDsFetcher + blockIDsLister Lister // Optional local directory to cache meta.json files. cacheDir string @@ -317,12 +318,12 @@ type BaseFetcher struct { } // NewBaseFetcher constructs BaseFetcher. -func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer) (*BaseFetcher, error) { +func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer) (*BaseFetcher, error) { return NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, NewBaseFetcherMetrics(reg)) } // NewBaseFetcherWithMetrics constructs BaseFetcher. -func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { +func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister Lister, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -348,12 +349,12 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore. // NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads. // NOTE: Not suitable to use in production. -func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher) (*MetaFetcher, error) { +func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister) (*MetaFetcher, error) { return NewMetaFetcher(logger, 1, bkt, blockIDsFetcher, "", nil, nil) } // NewMetaFetcher returns meta fetcher. -func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) { +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) { b, err := NewBaseFetcher(logger, concurrency, bkt, blockIDsFetcher, dir, reg) if err != nil { return nil, err @@ -362,7 +363,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente } // NewMetaFetcherWithMetrics returns meta fetcher. -func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) { +func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) { b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, baseFetcherMetrics) if err != nil { return nil, err diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 130c9830dc0..5e24e265382 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -74,7 +74,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { r := prometheus.NewRegistry() noopLogger := log.NewNopLogger() insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := NewBaseBlockIDsFetcher(noopLogger, insBkt) + baseBlockIDsFetcher := NewConcurrentLister(noopLogger, insBkt) baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r) testutil.Ok(t, err) diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index ea3d1fa79d3..cd6135b702e 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -31,7 +31,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) logger := log.NewNopLogger() - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 13e0cae6f61..12ef8961542 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -96,7 +96,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(nil, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(nil, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ duplicateBlocksFilter, }) @@ -198,7 +198,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, @@ -510,7 +510,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index b7b4464b364..d80895617ce 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -246,7 +246,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution)) } - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 1f8cdef2e42..668d64afce9 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -244,7 +244,7 @@ func newMetaFetcher( if ignoreMarkedForDeletion { filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency)) } - baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt) + baseBlockIDsFetcher := thanosblock.NewConcurrentLister(logger, fromBkt) return thanosblock.NewMetaFetcher( logger, concurrency, diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index bcd164047d1..46cbe9490ae 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -902,7 +902,7 @@ func TestBucketStore_Acceptance(t *testing.T) { testutil.Ok(tt, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), }) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 02c182cd0b5..c91fb4096df 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -154,7 +154,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m } insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(s.logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(s.logger, insBkt) metaFetcher, err := block.NewMetaFetcher(s.logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index d38e14587d7..5a665fb1e0d 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -883,7 +883,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul rec := &recorder{Bucket: bkt} insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConf), @@ -1441,7 +1441,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, } ibkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, ibkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, ibkt) f, err := block.NewRawMetaFetcher(logger, ibkt, baseBlockIDsFetcher) testutil.Ok(t, err) @@ -1891,7 +1891,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { ) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -1983,7 +1983,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2142,7 +2142,7 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { } // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2329,7 +2329,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb } // Instance a real bucket store we'll use to query back the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2546,7 +2546,7 @@ func TestSeries_ChunksHaveHashRepresentation(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -3522,7 +3522,7 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), }) @@ -3738,7 +3738,7 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { testutil.Ok(t, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), }) From 387b78d17f111e4b87e97826ea8dbaca26673d5e Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 15 Feb 2024 08:39:58 +0100 Subject: [PATCH 3/5] Run make docs Signed-off-by: Filip Petkovski --- docs/components/compact.md | 9 +++++++++ docs/components/store.md | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/docs/components/compact.md b/docs/components/compact.md index d210dd55a83..f307a32e3ef 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -279,6 +279,15 @@ usage: thanos compact [] Continuously compacts blocks in an object store bucket. Flags: + --block-discovery-strategy="concurrent" + One ofconcurrent, recursive. When set to + concurrent, stores will concurrently issue + one call per directory to discover active + blocks in the bucket. The recursive strategy + iterates through all objects in the bucket, + recursively traversing into each directory. + This avoids N+1 calls at the expense of having + slower bucket iterations. --block-files-concurrency=1 Number of goroutines to use when fetching/uploading block files from object diff --git a/docs/components/store.md b/docs/components/store.md index 8ecc53d68fd..408953ff19d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -29,6 +29,15 @@ Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS. Flags: + --block-discovery-strategy="concurrent" + One ofconcurrent, recursive. When set to + concurrent, stores will concurrently issue + one call per directory to discover active + blocks in the bucket. The recursive strategy + iterates through all objects in the bucket, + recursively traversing into each directory. + This avoids N+1 calls at the expense of having + slower bucket iterations. --block-meta-fetch-concurrency=32 Number of goroutines to use when fetching block metadata from object storage. From 2ec23438046534e7178654f8ba70022eed19fd84 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 15 Feb 2024 12:49:10 +0100 Subject: [PATCH 4/5] Fix whitespace Signed-off-by: Filip Petkovski --- cmd/thanos/compact.go | 2 +- cmd/thanos/store.go | 2 +- docs/components/compact.md | 2 +- docs/components/store.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 4d7370e936f..8923bd376e5 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -764,7 +764,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("false").BoolVar(&cc.disableDownsampling) strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") - cmd.Flag("block-discovery-strategy", "One of"+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 03e75b1783b..1c32e28c0f6 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -147,7 +147,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { Default("15m").DurationVar(&sc.syncInterval) strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") - cmd.Flag("block-discovery-strategy", "One of"+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy) cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). diff --git a/docs/components/compact.md b/docs/components/compact.md index f307a32e3ef..91e6fd04c64 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -280,7 +280,7 @@ Continuously compacts blocks in an object store bucket. Flags: --block-discovery-strategy="concurrent" - One ofconcurrent, recursive. When set to + One of concurrent, recursive. When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy diff --git a/docs/components/store.md b/docs/components/store.md index 408953ff19d..cf96bfdf4c1 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -30,7 +30,7 @@ Azure, Swift, Tencent COS and Aliyun OSS. Flags: --block-discovery-strategy="concurrent" - One ofconcurrent, recursive. When set to + One of concurrent, recursive. When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy From a3e967295e2d721c6188a21973eac8f5991d1b1a Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 19 Feb 2024 09:37:15 +0100 Subject: [PATCH 5/5] Add CHANGELOG entry Signed-off-by: Filip Petkovski --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 683ce6bbae3..63f72e6b317 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7083](https://github.com/thanos-io/thanos/pull/7083) Store Gateway: Fix lazy expanded postings with 0 length failed to be cached. - [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early - [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction +- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred. ### Added