diff --git a/CHANGELOG.md b/CHANGELOG.md index ca310bfb3bc..9b3079e4569 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7082](https://github.com/thanos-io/thanos/pull/7082) Stores: fix label values edge case when requesting external label values with matchers - [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early - [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction +- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred. ### Added - [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ee8158de1fc..8923bd376e5 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -239,8 +239,16 @@ func runCompact( consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) - baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) + var blockLister block.Lister + switch syncStrategy(conf.blockListStrategy) { + case concurrentDiscovery: + blockLister = block.NewConcurrentLister(logger, insBkt) + case recursiveDiscovery: + blockLister = block.NewRecursiveLister(logger, insBkt) + default: + return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) + } + baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } @@ -693,6 +701,7 @@ type compactConfig struct { wait bool waitInterval time.Duration disableDownsampling bool + blockListStrategy string blockMetaFetchConcurrency int blockFilesConcurrency int blockViewerSyncBlockInterval time.Duration @@ -754,6 +763,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway"). Default("false").BoolVar(&cc.disableDownsampling) + strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") + cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage."). diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 6045bb29c11..3d6b1292617 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -90,7 +90,7 @@ func RunDownsample( insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) // While fetching blocks, filter out blocks that were marked for no downsample. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency), diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index d8d1fffef00..1ced04637ba 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" @@ -157,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey()))) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) @@ -197,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey()))) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index a63f97d078e..dbe88e711ae 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/alecthomas/units" @@ -56,6 +57,13 @@ const ( retryIntervalDuration = 10 ) +type syncStrategy string + +const ( + concurrentDiscovery syncStrategy = "concurrent" + recursiveDiscovery syncStrategy = "recursive" +) + type storeConfig struct { indexCacheConfigs extflag.PathOrContent objStoreConfig extflag.PathOrContent @@ -74,6 +82,7 @@ type storeConfig struct { component component.StoreAPI debugLogging bool syncInterval time.Duration + blockListStrategy string blockSyncConcurrency int blockMetaFetchConcurrency int filterConf *store.FilterConfig @@ -137,6 +146,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("3m").DurationVar(&sc.syncInterval) + strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") + cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy) + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) @@ -345,9 +358,17 @@ func runStore( return errors.Wrap(err, "create index cache") } + var blockLister block.Lister + switch syncStrategy(conf.blockListStrategy) { + case concurrentDiscovery: + blockLister = block.NewConcurrentLister(logger, insBkt) + case recursiveDiscovery: + blockLister = block.NewRecursiveLister(logger, insBkt) + default: + return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) + } ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) - metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index e41e5deef5d..16417d73d46 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -350,7 +350,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path // We ignore any block that has the deletion marker file. filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)} - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { return err @@ -408,7 +408,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency) filters = append(filters, ignoreDeletionMarkFilter) } - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { return err @@ -510,7 +510,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat } insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err @@ -654,7 +654,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC return err } // TODO(bwplotka): Allow Bucket UI to visualize the state of block as well. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), @@ -833,7 +833,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat var sy *compact.Syncer { - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") @@ -1376,7 +1376,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P var sy *compact.Syncer { - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/docs/components/compact.md b/docs/components/compact.md index d210dd55a83..91e6fd04c64 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -279,6 +279,15 @@ usage: thanos compact [] Continuously compacts blocks in an object store bucket. Flags: + --block-discovery-strategy="concurrent" + One of concurrent, recursive. When set to + concurrent, stores will concurrently issue + one call per directory to discover active + blocks in the bucket. The recursive strategy + iterates through all objects in the bucket, + recursively traversing into each directory. + This avoids N+1 calls at the expense of having + slower bucket iterations. --block-files-concurrency=1 Number of goroutines to use when fetching/uploading block files from object diff --git a/docs/components/store.md b/docs/components/store.md index 85fd4ce6886..bbbeed87bd3 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -29,6 +29,15 @@ Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS. Flags: + --block-discovery-strategy="concurrent" + One of concurrent, recursive. When set to + concurrent, stores will concurrently issue + one call per directory to discover active + blocks in the bucket. The recursive strategy + iterates through all objects in the bucket, + recursively traversing into each directory. + This avoids N+1 calls at the expense of having + slower bucket iterations. --block-meta-fetch-concurrency=32 Number of goroutines to use when fetching block metadata from object storage. diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 828503e91b3..4911b4748b7 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -170,26 +170,27 @@ func DefaultModifiedLabelValues() [][]string { } } -// Fetcher interface to retieve blockId information from a bucket. -type BlockIDsFetcher interface { - // GetActiveBlocksIDs returning it via channel (streaming) and response. +// Lister lists block IDs from a bucket. +type Lister interface { + // GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response. // Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) } -type BaseBlockIDsFetcher struct { +// RecursiveLister lists block IDs by recursively iterating through a bucket. +type RecursiveLister struct { logger log.Logger bkt objstore.InstrumentedBucketReader } -func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher { - return &BaseBlockIDsFetcher{ +func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveLister { + return &RecursiveLister{ logger: logger, bkt: bkt, } } -func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { partialBlocks = make(map[ulid.ULID]bool) err = f.bkt.Iter(ctx, "", func(name string) error { parts := strings.Split(name, "/") @@ -216,6 +217,74 @@ func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, c return partialBlocks, err } +// ConcurrentLister lists block IDs by doing a top level iteration of the bucket +// followed by one Exists call for each discovered block to detect partial blocks. +type ConcurrentLister struct { + logger log.Logger + bkt objstore.InstrumentedBucketReader +} + +func NewConcurrentLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *ConcurrentLister { + return &ConcurrentLister{ + logger: logger, + bkt: bkt, + } +} + +func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { + const concurrency = 64 + + partialBlocks = make(map[ulid.ULID]bool) + var ( + metaChan = make(chan ulid.ULID, concurrency) + eg, gCtx = errgroup.WithContext(ctx) + mu sync.Mutex + ) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for uid := range metaChan { + // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. + // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. + // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). + metaFile := path.Join(uid.String(), MetaFilename) + ok, err := f.bkt.Exists(gCtx, metaFile) + if err != nil { + return errors.Wrapf(err, "meta.json file exists: %v", uid) + } + if !ok { + mu.Lock() + partialBlocks[uid] = true + mu.Unlock() + continue + } + ch <- uid + } + return nil + }) + } + + if err = f.bkt.Iter(ctx, "", func(name string) error { + id, ok := IsBlockDir(name) + if !ok { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case metaChan <- id: + } + return nil + }); err != nil { + return nil, err + } + close(metaChan) + + if err := eg.Wait(); err != nil { + return nil, err + } + return partialBlocks, nil +} + type MetadataFetcher interface { Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) UpdateOnChange(func([]metadata.Meta, error)) @@ -234,10 +303,10 @@ type MetadataFilter interface { // BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. // Go-routine safe. type BaseFetcher struct { - logger log.Logger - concurrency int - bkt objstore.InstrumentedBucketReader - blockIDsFetcher BlockIDsFetcher + logger log.Logger + concurrency int + bkt objstore.InstrumentedBucketReader + blockIDsLister Lister // Optional local directory to cache meta.json files. cacheDir string @@ -249,12 +318,12 @@ type BaseFetcher struct { } // NewBaseFetcher constructs BaseFetcher. -func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer) (*BaseFetcher, error) { +func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer) (*BaseFetcher, error) { return NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, NewBaseFetcherMetrics(reg)) } // NewBaseFetcherWithMetrics constructs BaseFetcher. -func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { +func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister Lister, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -268,24 +337,24 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore. } return &BaseFetcher{ - logger: log.With(logger, "component", "block.BaseFetcher"), - concurrency: concurrency, - bkt: bkt, - blockIDsFetcher: blockIDsFetcher, - cacheDir: cacheDir, - cached: map[ulid.ULID]*metadata.Meta{}, - syncs: metrics.Syncs, + logger: log.With(logger, "component", "block.BaseFetcher"), + concurrency: concurrency, + bkt: bkt, + blockIDsLister: blockIDsLister, + cacheDir: cacheDir, + cached: map[ulid.ULID]*metadata.Meta{}, + syncs: metrics.Syncs, }, nil } // NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads. // NOTE: Not suitable to use in production. -func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher) (*MetaFetcher, error) { +func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister) (*MetaFetcher, error) { return NewMetaFetcher(logger, 1, bkt, blockIDsFetcher, "", nil, nil) } // NewMetaFetcher returns meta fetcher. -func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) { +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) { b, err := NewBaseFetcher(logger, concurrency, bkt, blockIDsFetcher, dir, reg) if err != nil { return nil, err @@ -294,7 +363,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente } // NewMetaFetcherWithMetrics returns meta fetcher. -func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) { +func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) { b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, baseFetcherMetrics) if err != nil { return nil, err @@ -445,7 +514,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { // Workers scheduled, distribute blocks. eg.Go(func() error { defer close(ch) - partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + partialBlocks, err = f.blockIDsLister.GetActiveAndPartialBlockIDs(ctx, ch) return err }) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 11384e88fac..5e24e265382 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" @@ -73,7 +74,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { r := prometheus.NewRegistry() noopLogger := log.NewNopLogger() insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := NewBaseBlockIDsFetcher(noopLogger, insBkt) + baseBlockIDsFetcher := NewConcurrentLister(noopLogger, insBkt) baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r) testutil.Ok(t, err) diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index b66ca9f018a..cd6135b702e 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -19,6 +19,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -30,7 +31,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) logger := log.NewNopLogger() - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index f1e01ec4f49..12ef8961542 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/dedup" @@ -95,7 +96,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(nil, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(nil, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ duplicateBlocksFilter, }) @@ -197,7 +198,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, @@ -509,7 +510,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index c1936f095a9..d80895617ce 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -245,7 +246,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution)) } - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 1f8cdef2e42..668d64afce9 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -244,7 +244,7 @@ func newMetaFetcher( if ignoreMarkedForDeletion { filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency)) } - baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt) + baseBlockIDsFetcher := thanosblock.NewConcurrentLister(logger, fromBkt) return thanosblock.NewMetaFetcher( logger, concurrency, diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index dae3e503c11..8bbb97f5eb1 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" @@ -900,7 +901,7 @@ func TestBucketStore_Acceptance(t *testing.T) { testutil.Ok(tt, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), }) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 02c182cd0b5..c91fb4096df 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -154,7 +154,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m } insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(s.logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(s.logger, insBkt) metaFetcher, err := block.NewMetaFetcher(s.logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 67223a9467f..9456af303c0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -883,7 +883,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul rec := &recorder{Bucket: bkt} insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConf), @@ -1406,7 +1406,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, } ibkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, ibkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, ibkt) f, err := block.NewRawMetaFetcher(logger, ibkt, baseBlockIDsFetcher) testutil.Ok(t, err) @@ -1856,7 +1856,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { ) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -1948,7 +1948,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2107,7 +2107,7 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { } // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2294,7 +2294,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb } // Instance a real bucket store we'll use to query back the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2511,7 +2511,7 @@ func TestSeries_ChunksHaveHashRepresentation(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -3487,7 +3487,7 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), }) @@ -3703,7 +3703,7 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { testutil.Ok(t, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), })