diff --git a/go.mod b/go.mod index 9e7f8dd8aa6..6e0bef3c807 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/sony/gobreaker v0.4.1 github.com/spf13/afero v1.2.2 github.com/stretchr/testify v1.6.1 - github.com/thanos-io/thanos v0.13.1-0.20201125124348-2008ef009e88 + github.com/thanos-io/thanos v0.13.1-0.20201130180807-84afc97e7d58 github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50 diff --git a/go.sum b/go.sum index 77419695494..686d6953cca 100644 --- a/go.sum +++ b/go.sum @@ -1146,8 +1146,8 @@ github.com/thanos-io/thanos v0.13.1-0.20200807203500-9b578afb4763/go.mod h1:KyW0 github.com/thanos-io/thanos v0.13.1-0.20201019130456-f41940581d9a/go.mod h1:A3qUEEbsVkplJnxyDLwuIuvTDaJPByTH+hMdTl9ujAA= github.com/thanos-io/thanos v0.13.1-0.20201030101306-47f9a225cc52 h1:z3hglXVwJ4HgU0OoDS+8+MvEipv/U83IQ+fMsDr00YQ= github.com/thanos-io/thanos v0.13.1-0.20201030101306-47f9a225cc52/go.mod h1:OqqX4x21cg5N5MMHd/yGQAc/V3wg8a7Do4Jk8HfaFZQ= -github.com/thanos-io/thanos v0.13.1-0.20201125124348-2008ef009e88 h1:r3lyiYA58zA6yE4CqE0ncavgI2rHgyhLQom5z0usbNM= -github.com/thanos-io/thanos v0.13.1-0.20201125124348-2008ef009e88/go.mod h1:ffr9z+gefM664JBH/CEMHyHvShq2BQTejT/Ws+V+80Q= +github.com/thanos-io/thanos v0.13.1-0.20201130180807-84afc97e7d58 h1:Q5t3TKhiFQ2J3XQv1psoMBSBk/Dx6p4JqoETXiWQaYg= +github.com/thanos-io/thanos v0.13.1-0.20201130180807-84afc97e7d58/go.mod h1:ffr9z+gefM664JBH/CEMHyHvShq2BQTejT/Ws+V+80Q= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/querier/block_meta.go b/pkg/querier/block_meta.go deleted file mode 100644 index ccce0ab2403..00000000000 --- a/pkg/querier/block_meta.go +++ /dev/null @@ -1,52 +0,0 @@ -package querier - -import ( - "fmt" - "strings" - "time" - - "github.com/oklog/ulid" - "github.com/thanos-io/thanos/pkg/block/metadata" - - "github.com/cortexproject/cortex/pkg/util" -) - -// BlockMeta is a struct extending the Thanos block metadata and adding -// Cortex-specific data too. -type BlockMeta struct { - metadata.Meta - - // UploadedAt is the timestamp when the block has been completed to be uploaded - // to the storage. - UploadedAt time.Time -} - -func (m BlockMeta) String() string { - minT := util.TimeFromMillis(m.MinTime).UTC() - maxT := util.TimeFromMillis(m.MaxTime).UTC() - - return fmt.Sprintf("%s (min time: %s max time: %s)", m.ULID, minT.String(), maxT.String()) -} - -type BlockMetas []*BlockMeta - -func (s BlockMetas) String() string { - b := strings.Builder{} - - for idx, m := range s { - if idx > 0 { - b.WriteString(", ") - } - b.WriteString(m.String()) - } - - return b.String() -} - -func getULIDsFromBlockMetas(metas []*BlockMeta) []ulid.ULID { - ids := make([]ulid.ULID, len(metas)) - for i, m := range metas { - ids[i] = m.ULID - } - return ids -} diff --git a/pkg/querier/blocks_consistency_checker.go b/pkg/querier/blocks_consistency_checker.go index 6edebf057cd..b0de409110c 100644 --- a/pkg/querier/blocks_consistency_checker.go +++ b/pkg/querier/blocks_consistency_checker.go @@ -8,7 +8,8 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" ) type BlocksConsistencyChecker struct { @@ -36,7 +37,7 @@ func NewBlocksConsistencyChecker(uploadGracePeriod, deletionGracePeriod time.Dur } } -func (c *BlocksConsistencyChecker) Check(knownBlocks []*BlockMeta, knownDeletionMarks map[ulid.ULID]*metadata.DeletionMark, queriedBlocks []ulid.ULID) (missingBlocks []ulid.ULID) { +func (c *BlocksConsistencyChecker) Check(knownBlocks bucketindex.Blocks, knownDeletionMarks map[ulid.ULID]*bucketindex.BlockDeletionMark, queriedBlocks []ulid.ULID) (missingBlocks []ulid.ULID) { c.checksTotal.Inc() // Reverse the map of queried blocks, so that we can easily look for missing ones. @@ -46,7 +47,7 @@ func (c *BlocksConsistencyChecker) Check(knownBlocks []*BlockMeta, knownDeletion } // Look for any missing block. - for _, meta := range knownBlocks { + for _, block := range knownBlocks { // Some recently uploaded blocks, already discovered by the querier, may not have been discovered // and loaded by the store-gateway yet. In order to avoid false positives, we grant some time // to the store-gateway to discover them. It's safe to exclude recently uploaded blocks because: @@ -54,8 +55,8 @@ func (c *BlocksConsistencyChecker) Check(knownBlocks []*BlockMeta, knownDeletion // on the configured retention period). // - Blocks uploaded by compactor: the source blocks are marked for deletion but will continue to be // queried by queriers for a while (depends on the configured deletion marks delay). - if c.uploadGracePeriod > 0 && time.Since(meta.UploadedAt) < c.uploadGracePeriod { - level.Debug(c.logger).Log("msg", "block skipped from consistency check because it was uploaded recently", "block", meta.ULID.String(), "uploadedAt", meta.UploadedAt.String()) + if c.uploadGracePeriod > 0 && time.Since(block.GetUploadedAt()) < c.uploadGracePeriod { + level.Debug(c.logger).Log("msg", "block skipped from consistency check because it was uploaded recently", "block", block.ID.String(), "uploadedAt", block.GetUploadedAt().String()) continue } @@ -63,17 +64,17 @@ func (c *BlocksConsistencyChecker) Check(knownBlocks []*BlockMeta, knownDeletion // on blocks that can't be queried because they were offloaded. For this reason, we don't run the consistency check on any block // which has been marked for deletion more then "grace period" time ago. Basically, the grace period is the time // we still expect a block marked for deletion to be still queried. - if mark := knownDeletionMarks[meta.ULID]; mark != nil { + if mark := knownDeletionMarks[block.ID]; mark != nil { deletionTime := time.Unix(mark.DeletionTime, 0) if c.deletionGracePeriod > 0 && time.Since(deletionTime) > c.deletionGracePeriod { - level.Debug(c.logger).Log("msg", "block skipped from consistency check because it is marked for deletion", "block", meta.ULID.String(), "deletionTime", deletionTime.String()) + level.Debug(c.logger).Log("msg", "block skipped from consistency check because it is marked for deletion", "block", block.ID.String(), "deletionTime", deletionTime.String()) continue } } - if _, ok := actualBlocks[meta.ULID]; !ok { - missingBlocks = append(missingBlocks, meta.ULID) + if _, ok := actualBlocks[block.ID]; !ok { + missingBlocks = append(missingBlocks, block.ID) } } diff --git a/pkg/querier/blocks_consistency_checker_test.go b/pkg/querier/blocks_consistency_checker_test.go index a2829caa73d..f68206d664b 100644 --- a/pkg/querier/blocks_consistency_checker_test.go +++ b/pkg/querier/blocks_consistency_checker_test.go @@ -8,10 +8,9 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" - "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util" ) @@ -25,78 +24,78 @@ func TestBlocksConsistencyChecker_Check(t *testing.T) { block3 := ulid.MustNew(uint64(util.TimeToMillis(now.Add(-uploadGracePeriod*4))), nil) tests := map[string]struct { - knownBlocks []*BlockMeta - knownDeletionMarks map[ulid.ULID]*metadata.DeletionMark + knownBlocks bucketindex.Blocks + knownDeletionMarks map[ulid.ULID]*bucketindex.BlockDeletionMark queriedBlocks []ulid.ULID expectedMissingBlocks []ulid.ULID }{ "no known blocks": { - knownBlocks: []*BlockMeta{}, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{}, + knownBlocks: bucketindex.Blocks{}, + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, queriedBlocks: []ulid.ULID{}, }, "all known blocks have been queried from a single store-gateway": { - knownBlocks: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}, UploadedAt: now.Add(-time.Hour)}, + knownBlocks: bucketindex.Blocks{ + {ID: block1, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block2, UploadedAt: now.Add(-time.Hour).Unix()}, }, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{}, + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, queriedBlocks: []ulid.ULID{block1, block2}, }, "all known blocks have been queried from multiple store-gateway": { - knownBlocks: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}, UploadedAt: now.Add(-time.Hour)}, + knownBlocks: bucketindex.Blocks{ + {ID: block1, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block2, UploadedAt: now.Add(-time.Hour).Unix()}, }, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{}, + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, queriedBlocks: []ulid.ULID{block1, block2}, }, "store-gateway has queried more blocks than expected": { - knownBlocks: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}, UploadedAt: now.Add(-time.Hour)}, + knownBlocks: bucketindex.Blocks{ + {ID: block1, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block2, UploadedAt: now.Add(-time.Hour).Unix()}, }, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{}, + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, queriedBlocks: []ulid.ULID{block1, block2, block3}, }, "store-gateway has queried less blocks than expected": { - knownBlocks: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}, UploadedAt: now.Add(-time.Hour)}, + knownBlocks: bucketindex.Blocks{ + {ID: block1, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block2, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block3, UploadedAt: now.Add(-time.Hour).Unix()}, }, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{}, + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, queriedBlocks: []ulid.ULID{block1, block3}, expectedMissingBlocks: []ulid.ULID{block2}, }, "store-gateway has queried less blocks than expected, but the missing block has been recently uploaded": { - knownBlocks: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}, UploadedAt: now.Add(-uploadGracePeriod).Add(time.Minute)}, + knownBlocks: bucketindex.Blocks{ + {ID: block1, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block2, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block3, UploadedAt: now.Add(-uploadGracePeriod).Add(time.Minute).Unix()}, }, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{}, + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, queriedBlocks: []ulid.ULID{block1, block2}, }, "store-gateway has queried less blocks than expected and the missing block has been recently marked for deletion": { - knownBlocks: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}, UploadedAt: now.Add(-time.Hour)}, + knownBlocks: bucketindex.Blocks{ + {ID: block1, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block2, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block3, UploadedAt: now.Add(-time.Hour).Unix()}, }, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{ + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ block3: {DeletionTime: now.Add(-deletionGracePeriod / 2).Unix()}, }, queriedBlocks: []ulid.ULID{block1, block2}, expectedMissingBlocks: []ulid.ULID{block3}, }, "store-gateway has queried less blocks than expected and the missing block has been marked for deletion long time ago": { - knownBlocks: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}, UploadedAt: now.Add(-time.Hour)}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}, UploadedAt: now.Add(-time.Hour)}, + knownBlocks: bucketindex.Blocks{ + {ID: block1, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block2, UploadedAt: now.Add(-time.Hour).Unix()}, + {ID: block3, UploadedAt: now.Add(-time.Hour).Unix()}, }, - knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{ + knownDeletionMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ block3: {DeletionTime: now.Add(-deletionGracePeriod * 2).Unix()}, }, queriedBlocks: []ulid.ULID{block1, block2}, diff --git a/pkg/querier/blocks_scanner.go b/pkg/querier/blocks_scanner.go index b3417219627..c88259ed9d1 100644 --- a/pkg/querier/blocks_scanner.go +++ b/pkg/querier/blocks_scanner.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" @@ -56,9 +57,9 @@ type BlocksScanner struct { // Keep the per-tenant/user metas found during the last run. userMx sync.RWMutex - userMetas map[string][]*BlockMeta - userMetasLookup map[string]map[ulid.ULID]*BlockMeta - userDeletionMarks map[string]map[ulid.ULID]*metadata.DeletionMark + userMetas map[string]bucketindex.Blocks + userMetasLookup map[string]map[ulid.ULID]*bucketindex.Block + userDeletionMarks map[string]map[ulid.ULID]*bucketindex.BlockDeletionMark scanDuration prometheus.Histogram scanLastSuccess prometheus.Gauge @@ -71,9 +72,9 @@ func NewBlocksScanner(cfg BlocksScannerConfig, bucketClient objstore.Bucket, log bucketClient: bucketClient, fetchers: make(map[string]userFetcher), usersScanner: cortex_tsdb.NewUsersScanner(bucketClient, cortex_tsdb.AllUsers, logger), - userMetas: make(map[string][]*BlockMeta), - userMetasLookup: make(map[string]map[ulid.ULID]*BlockMeta), - userDeletionMarks: map[string]map[ulid.ULID]*metadata.DeletionMark{}, + userMetas: make(map[string]bucketindex.Blocks), + userMetasLookup: make(map[string]map[ulid.ULID]*bucketindex.Block), + userDeletionMarks: map[string]map[ulid.ULID]*bucketindex.BlockDeletionMark{}, fetchersMetrics: storegateway.NewMetadataFetcherMetrics(), scanDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_querier_blocks_scan_duration_seconds", @@ -100,7 +101,7 @@ func NewBlocksScanner(cfg BlocksScannerConfig, bucketClient objstore.Bucket, log // GetBlocks returns known blocks for userID containing samples within the range minT // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. -func (d *BlocksScanner) GetBlocks(userID string, minT, maxT int64) ([]*BlockMeta, map[ulid.ULID]*metadata.DeletionMark, error) { +func (d *BlocksScanner) GetBlocks(_ context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { // We need to ensure the initial full bucket scan succeeded. if d.State() != services.Running { return nil, nil, errBlocksScannerNotRunning @@ -119,7 +120,7 @@ func (d *BlocksScanner) GetBlocks(userID string, minT, maxT int64) ([]*BlockMeta // Given we do expect the large majority of queries to have a time range close // to "now", we're going to find matching blocks iterating the list in reverse order. - var matchingMetas []*BlockMeta + var matchingMetas bucketindex.Blocks for i := len(userMetas) - 1; i >= 0; i-- { // NOTE: Block intervals are half-open: [MinTime, MaxTime). if userMetas[i].MinTime <= maxT && minT < userMetas[i].MaxTime { @@ -133,11 +134,11 @@ func (d *BlocksScanner) GetBlocks(userID string, minT, maxT int64) ([]*BlockMeta } // Filter deletion marks by matching blocks only. - matchingDeletionMarks := map[ulid.ULID]*metadata.DeletionMark{} + matchingDeletionMarks := map[ulid.ULID]*bucketindex.BlockDeletionMark{} if userDeletionMarks, ok := d.userDeletionMarks[userID]; ok { for _, m := range matchingMetas { - if d := userDeletionMarks[m.ULID]; d != nil { - matchingDeletionMarks[m.ULID] = d + if d := userDeletionMarks[m.ID]; d != nil { + matchingDeletionMarks[m.ID] = d } } } @@ -181,9 +182,9 @@ func (d *BlocksScanner) scanBucket(ctx context.Context) (returnErr error) { jobsChan := make(chan string) resMx := sync.Mutex{} - resMetas := map[string][]*BlockMeta{} - resMetasLookup := map[string]map[ulid.ULID]*BlockMeta{} - resDeletionMarks := map[string]map[ulid.ULID]*metadata.DeletionMark{} + resMetas := map[string]bucketindex.Blocks{} + resMetasLookup := map[string]map[ulid.ULID]*bucketindex.Block{} + resDeletionMarks := map[string]map[ulid.ULID]*bucketindex.BlockDeletionMark{} resErrs := tsdb_errors.NewMulti() // Create a pool of workers which will synchronize metas. The pool size @@ -200,9 +201,9 @@ func (d *BlocksScanner) scanBucket(ctx context.Context) (returnErr error) { metas, deletionMarks, err := d.scanUserBlocksWithRetries(ctx, userID) // Build the lookup map. - lookup := map[ulid.ULID]*BlockMeta{} + lookup := map[ulid.ULID]*bucketindex.Block{} for _, m := range metas { - lookup[m.ULID] = m + lookup[m.ID] = m } resMx.Lock() @@ -264,7 +265,7 @@ pushJobsLoop: // scanUserBlocksWithRetries runs scanUserBlocks() retrying multiple times // in case of error. -func (d *BlocksScanner) scanUserBlocksWithRetries(ctx context.Context, userID string) (metas []*BlockMeta, deletionMarks map[ulid.ULID]*metadata.DeletionMark, err error) { +func (d *BlocksScanner) scanUserBlocksWithRetries(ctx context.Context, userID string) (metas bucketindex.Blocks, deletionMarks map[ulid.ULID]*bucketindex.BlockDeletionMark, err error) { retries := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: time.Second, MaxBackoff: 30 * time.Second, @@ -283,7 +284,7 @@ func (d *BlocksScanner) scanUserBlocksWithRetries(ctx context.Context, userID st return } -func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) ([]*BlockMeta, map[ulid.ULID]*metadata.DeletionMark, error) { +func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { fetcher, userBucket, deletionMarkFilter, err := d.getOrCreateMetaFetcher(userID) if err != nil { return nil, nil, errors.Wrapf(err, "create meta fetcher for user %s", userID) @@ -300,11 +301,9 @@ func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) ([]*B logPartialBlocks(userID, partials, d.logger) } - res := make([]*BlockMeta, 0, len(metas)) + res := make(bucketindex.Blocks, 0, len(metas)) for _, m := range metas { - blockMeta := &BlockMeta{ - Meta: *m, - } + blockMeta := bucketindex.BlockFromThanosMeta(*m) // If the block is already known, we can get the remaining attributes from there // because a block is immutable. @@ -320,7 +319,7 @@ func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) ([]*B // Since the meta.json file is the last file of a block being uploaded and it's immutable // we can safely assume that the last modified timestamp of the meta.json is the time when // the block has completed to be uploaded. - blockMeta.UploadedAt = attrs.LastModified + blockMeta.UploadedAt = attrs.LastModified.Unix() } res = append(res, blockMeta) @@ -329,7 +328,13 @@ func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) ([]*B // The blocks scanner expects all blocks to be sorted by max time. sortBlockMetasByMaxTime(res) - return res, deletionMarkFilter.DeletionMarkBlocks(), nil + // Convert deletion marks to our onw data type. + marks := map[ulid.ULID]*bucketindex.BlockDeletionMark{} + for id, m := range deletionMarkFilter.DeletionMarkBlocks() { + marks[id] = bucketindex.BlockDeletionMarkFromThanosMarker(m) + } + + return res, marks, nil } func (d *BlocksScanner) getOrCreateMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) { @@ -386,7 +391,7 @@ func (d *BlocksScanner) createMetaFetcher(userID string) (block.MetadataFetcher, return f, userBucket, deletionMarkFilter, nil } -func (d *BlocksScanner) getBlockMeta(userID string, blockID ulid.ULID) *BlockMeta { +func (d *BlocksScanner) getBlockMeta(userID string, blockID ulid.ULID) *bucketindex.Block { d.userMx.RLock() defer d.userMx.RUnlock() @@ -398,9 +403,9 @@ func (d *BlocksScanner) getBlockMeta(userID string, blockID ulid.ULID) *BlockMet return metas[blockID] } -func sortBlockMetasByMaxTime(metas []*BlockMeta) { - sort.Slice(metas, func(i, j int) bool { - return metas[i].MaxTime < metas[j].MaxTime +func sortBlockMetasByMaxTime(blocks bucketindex.Blocks) { + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].MaxTime < blocks[j].MaxTime }) } diff --git a/pkg/querier/blocks_scanner_test.go b/pkg/querier/blocks_scanner_test.go index eb3e9d20e0c..c0bffcfae42 100644 --- a/pkg/querier/blocks_scanner_test.go +++ b/pkg/querier/blocks_scanner_test.go @@ -25,6 +25,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -36,26 +37,26 @@ func TestBlocksScanner_InitialScan(t *testing.T) { user1Block1 := mockStorageBlock(t, bucket, "user-1", 10, 20) user1Block2 := mockStorageBlock(t, bucket, "user-1", 20, 30) user2Block1 := mockStorageBlock(t, bucket, "user-2", 10, 20) - user2Mark1 := mockStorageDeletionMark(t, bucket, "user-2", user2Block1) + user2Mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-2", user2Block1)) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, user1Block2.ULID, blocks[0].ULID) - assert.Equal(t, user1Block1.ULID, blocks[1].ULID) - assert.WithinDuration(t, time.Now(), blocks[0].UploadedAt, 5*time.Second) - assert.WithinDuration(t, time.Now(), blocks[1].UploadedAt, 5*time.Second) + assert.Equal(t, user1Block2.ULID, blocks[0].ID) + assert.Equal(t, user1Block1.ULID, blocks[1].ID) + assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) + assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second) assert.Empty(t, deletionMarks) - blocks, deletionMarks, err = s.GetBlocks("user-2", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-2", 0, 30) require.NoError(t, err) require.Equal(t, 1, len(blocks)) - assert.Equal(t, user2Block1.ULID, blocks[0].ULID) - assert.WithinDuration(t, time.Now(), blocks[0].UploadedAt, 5*time.Second) - assert.Equal(t, map[ulid.ULID]*metadata.DeletionMark{ - user2Block1.ULID: &user2Mark1, + assert.Equal(t, user2Block1.ULID, blocks[0].ID) + assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) + assert.Equal(t, map[ulid.ULID]*bucketindex.BlockDeletionMark{ + user2Block1.ULID: user2Mark1, }, deletionMarks) assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` @@ -105,7 +106,7 @@ func TestBlocksScanner_InitialScanFailure(t *testing.T) { require.NoError(t, s.StartAsync(ctx)) require.Error(t, s.AwaitRunning(ctx)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) assert.Equal(t, errBlocksScannerNotRunning, err) assert.Nil(t, blocks) assert.Nil(t, deletionMarks) @@ -214,27 +215,27 @@ func TestBlocksScanner_PeriodicScanFindsNewUser(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 0, len(blocks)) assert.Empty(t, deletionMarks) block1 := mockStorageBlock(t, bucket, "user-1", 10, 20) block2 := mockStorageBlock(t, bucket, "user-1", 20, 30) - mark2 := mockStorageDeletionMark(t, bucket, "user-1", block2) + mark2 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block2)) // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) - assert.Equal(t, block1.ULID, blocks[1].ULID) - assert.WithinDuration(t, time.Now(), blocks[0].UploadedAt, 5*time.Second) - assert.WithinDuration(t, time.Now(), blocks[1].UploadedAt, 5*time.Second) - assert.Equal(t, map[ulid.ULID]*metadata.DeletionMark{ - block2.ULID: &mark2, + assert.Equal(t, block2.ULID, blocks[0].ID) + assert.Equal(t, block1.ULID, blocks[1].ID) + assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) + assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second) + assert.Equal(t, map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block2.ULID: mark2, }, deletionMarks) } @@ -247,11 +248,11 @@ func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 1, len(blocks)) - assert.Equal(t, block1.ULID, blocks[0].ULID) - assert.WithinDuration(t, time.Now(), blocks[0].UploadedAt, 5*time.Second) + assert.Equal(t, block1.ULID, blocks[0].ID) + assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) assert.Empty(t, deletionMarks) block2 := mockStorageBlock(t, bucket, "user-1", 20, 30) @@ -259,13 +260,13 @@ func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) { // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) - assert.Equal(t, block1.ULID, blocks[1].ULID) - assert.WithinDuration(t, time.Now(), blocks[0].UploadedAt, 5*time.Second) - assert.WithinDuration(t, time.Now(), blocks[1].UploadedAt, 5*time.Second) + assert.Equal(t, block2.ULID, blocks[0].ID) + assert.Equal(t, block1.ULID, blocks[1].ID) + assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) + assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second) assert.Empty(t, deletionMarks) } @@ -279,25 +280,25 @@ func TestBlocksScanner_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) - assert.Equal(t, block1.ULID, blocks[1].ULID) + assert.Equal(t, block2.ULID, blocks[0].ID) + assert.Equal(t, block1.ULID, blocks[1].ID) assert.Empty(t, deletionMarks) - mark1 := mockStorageDeletionMark(t, bucket, "user-1", block1) + mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block1)) // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) - assert.Equal(t, block1.ULID, blocks[1].ULID) - assert.Equal(t, map[ulid.ULID]*metadata.DeletionMark{ - block1.ULID: &mark1, + assert.Equal(t, block2.ULID, blocks[0].ID) + assert.Equal(t, block1.ULID, blocks[1].ID) + assert.Equal(t, map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block1.ULID: mark1, }, deletionMarks) } @@ -311,11 +312,11 @@ func TestBlocksScanner_PeriodicScanFindsDeletedBlock(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) - assert.Equal(t, block1.ULID, blocks[1].ULID) + assert.Equal(t, block2.ULID, blocks[0].ID) + assert.Equal(t, block1.ULID, blocks[1].ID) assert.Empty(t, deletionMarks) require.NoError(t, bucket.Delete(ctx, fmt.Sprintf("%s/%s", "user-1", block1.ULID.String()))) @@ -323,10 +324,10 @@ func TestBlocksScanner_PeriodicScanFindsDeletedBlock(t *testing.T) { // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 1, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) + assert.Equal(t, block2.ULID, blocks[0].ID) assert.Empty(t, deletionMarks) } @@ -340,11 +341,11 @@ func TestBlocksScanner_PeriodicScanFindsDeletedUser(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) - assert.Equal(t, block1.ULID, blocks[1].ULID) + assert.Equal(t, block2.ULID, blocks[0].ID) + assert.Equal(t, block1.ULID, blocks[1].ID) assert.Empty(t, deletionMarks) require.NoError(t, bucket.Delete(ctx, "user-1")) @@ -352,7 +353,7 @@ func TestBlocksScanner_PeriodicScanFindsDeletedUser(t *testing.T) { // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks("user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) require.NoError(t, err) require.Equal(t, 0, len(blocks)) assert.Empty(t, deletionMarks) @@ -368,11 +369,11 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks("user-1", 0, 40) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 40) require.NoError(t, err) require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ULID) - assert.Equal(t, block1.ULID, blocks[1].ULID) + assert.Equal(t, block2.ULID, blocks[0].ID) + assert.Equal(t, block1.ULID, blocks[1].ID) assert.Empty(t, deletionMarks) require.NoError(t, bucket.Delete(ctx, "user-1")) @@ -380,7 +381,7 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks("user-1", 0, 40) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40) require.NoError(t, err) require.Equal(t, 0, len(blocks)) assert.Empty(t, deletionMarks) @@ -390,10 +391,10 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks("user-1", 0, 40) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40) require.NoError(t, err) require.Equal(t, 1, len(blocks)) - assert.Equal(t, block3.ULID, blocks[0].ULID) + assert.Equal(t, block3.ULID, blocks[0].ID) assert.Empty(t, deletionMarks) } @@ -406,7 +407,7 @@ func TestBlocksScanner_GetBlocks(t *testing.T) { block2 := mockStorageBlock(t, bucket, "user-1", 12, 20) block3 := mockStorageBlock(t, bucket, "user-1", 20, 30) block4 := mockStorageBlock(t, bucket, "user-1", 30, 40) - mark3 := mockStorageDeletionMark(t, bucket, "user-1", block3) + mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block3)) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) @@ -414,73 +415,73 @@ func TestBlocksScanner_GetBlocks(t *testing.T) { minT int64 maxT int64 expectedMetas []tsdb.BlockMeta - expectedMarks map[ulid.ULID]*metadata.DeletionMark + expectedMarks map[ulid.ULID]*bucketindex.BlockDeletionMark }{ "no matching block because the range is too low": { minT: 0, maxT: 5, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{}, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, }, "no matching block because the range is too high": { minT: 50, maxT: 60, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{}, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, }, "matching all blocks": { minT: 0, maxT: 60, expectedMetas: []tsdb.BlockMeta{block4, block3, block2, block1}, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{ - block3.ULID: &mark3, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block3.ULID: mark3, }, }, "query range starting at a block maxT": { minT: block3.MaxTime, maxT: 60, expectedMetas: []tsdb.BlockMeta{block4}, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{}, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, }, "query range ending at a block minT": { minT: block3.MinTime, maxT: block4.MinTime, expectedMetas: []tsdb.BlockMeta{block4, block3}, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{ - block3.ULID: &mark3, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block3.ULID: mark3, }, }, "query range within a single block": { minT: block3.MinTime + 2, maxT: block3.MaxTime - 2, expectedMetas: []tsdb.BlockMeta{block3}, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{ - block3.ULID: &mark3, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block3.ULID: mark3, }, }, "query range within multiple blocks": { minT: 13, maxT: 16, expectedMetas: []tsdb.BlockMeta{block2, block1}, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{}, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, }, "query range matching exactly a single block": { minT: block3.MinTime, maxT: block3.MaxTime - 1, expectedMetas: []tsdb.BlockMeta{block3}, - expectedMarks: map[ulid.ULID]*metadata.DeletionMark{ - block3.ULID: &mark3, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block3.ULID: mark3, }, }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - metas, deletionMarks, err := s.GetBlocks("user-1", testData.minT, testData.maxT) + metas, deletionMarks, err := s.GetBlocks(ctx, "user-1", testData.minT, testData.maxT) require.NoError(t, err) require.Equal(t, len(testData.expectedMetas), len(metas)) require.Equal(t, testData.expectedMarks, deletionMarks) for i, expectedBlock := range testData.expectedMetas { - assert.Equal(t, expectedBlock.ULID, metas[i].ULID) + assert.Equal(t, expectedBlock.ULID, metas[i].ID) } }) } @@ -515,7 +516,6 @@ func prepareBlocksScannerConfig() BlocksScannerConfig { ScanInterval: time.Minute, TenantsConcurrency: 10, MetasConcurrency: 10, - ConsistencyDelay: 0, IgnoreDeletionMarksDelay: time.Hour, } } @@ -548,7 +548,7 @@ func mockStorageBlock(t *testing.T, bucket objstore.Bucket, userID string, minT, return meta } -func mockStorageDeletionMark(t *testing.T, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) metadata.DeletionMark { +func mockStorageDeletionMark(t *testing.T, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *metadata.DeletionMark { mark := metadata.DeletionMark{ ID: meta.ULID, DeletionTime: time.Now().Add(-time.Minute).Unix(), @@ -564,5 +564,5 @@ func mockStorageDeletionMark(t *testing.T, bucket objstore.Bucket, userID string markPath := fmt.Sprintf("%s/%s/%s", userID, meta.ULID.String(), metadata.DeletionMarkFilename) require.NoError(t, bucket.Upload(context.Background(), markPath, markContentReader)) - return mark + return &mark } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 2f0b845e16f..b127bf08e64 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -32,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/tenant" @@ -68,7 +68,7 @@ type BlocksFinder interface { // GetBlocks returns known blocks for userID containing samples within the range minT // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. - GetBlocks(userID string, minT, maxT int64) ([]*BlockMeta, map[ulid.ULID]*metadata.DeletionMark, error) + GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) } // BlocksStoreClient is the interface that should be implemented by any client used @@ -161,7 +161,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa // Blocks scanner doesn't use chunks, but we pass config for consistency. cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) if err != nil { - return nil, errors.Wrapf(err, "create caching bucket") + return nil, errors.Wrap(err, "create caching bucket") } bucketClient = cachingBucket @@ -170,7 +170,6 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency, MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency, CacheDir: storageCfg.BucketStore.SyncDir, - ConsistencyDelay: storageCfg.BucketStore.ConsistencyDelay, IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, }, bucketClient, logger, reg) @@ -446,22 +445,22 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg } // Find the list of blocks we need to query given the time range. - knownMetas, knownDeletionMarks, err := q.finder.GetBlocks(q.userID, minT, maxT) + knownBlocks, knownDeletionMarks, err := q.finder.GetBlocks(ctx, q.userID, minT, maxT) if err != nil { return err } - if len(knownMetas) == 0 { + if len(knownBlocks) == 0 { q.metrics.storesHit.Observe(0) level.Debug(logger).Log("msg", "no blocks found") return nil } - level.Debug(logger).Log("msg", "found blocks to query", "expected", BlockMetas(knownMetas).String()) + level.Debug(logger).Log("msg", "found blocks to query", "expected", knownBlocks.String()) var ( // At the beginning the list of blocks to query are all known blocks. - remainingBlocks = getULIDsFromBlockMetas(knownMetas) + remainingBlocks = knownBlocks.GetULIDs() attemptedBlocks = map[ulid.ULID][]string{} touchedStores = map[string]struct{}{} @@ -504,7 +503,7 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg } // Ensure all expected blocks have been queried (during all tries done so far). - missingBlocks := q.consistency.Check(knownMetas, knownDeletionMarks, resQueriedBlocks) + missingBlocks := q.consistency.Check(knownBlocks, knownDeletionMarks, resQueriedBlocks) if len(missingBlocks) == 0 { q.metrics.storesHit.Observe(float64(len(touchedStores))) q.metrics.refetches.Observe(float64(attempt - 1)) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 52587ddf9ca..a0425ea621c 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -18,18 +18,17 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" @@ -63,7 +62,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { } tests := map[string]struct { - finderResult []*BlockMeta + finderResult bucketindex.Blocks finderErr error storeSetResponses []interface{} limits BlocksStoreLimits @@ -82,9 +81,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { expectedErr: "unable to find blocks", }, "error while getting clients to query the store-gateway": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ errors.New("no client found"), @@ -93,9 +92,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { expectedErr: "no client found", }, "a single store-gateway instance holds the required blocks (single returned series)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -118,9 +117,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, "a single store-gateway instance holds the required blocks (multiple returned series)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -149,9 +148,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, "multiple store-gateway instances holds the required blocks without overlapping series (single returned series)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -177,9 +176,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, "multiple store-gateway instances holds the required blocks with overlapping series (single returned series)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -206,9 +205,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, "multiple store-gateway instances holds the required blocks with overlapping series (multiple returned series)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -274,9 +273,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { `, }, "a single store-gateway instance has some missing blocks (consistency check failed)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. @@ -294,11 +293,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { expectedErr: fmt.Sprintf("consistency check failed because some blocks were not queried: %s", block2.String()), }, "multiple store-gateway instances have some missing blocks (consistency check failed)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + {ID: block4}, }, storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. @@ -319,11 +318,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { expectedErr: fmt.Sprintf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), }, "multiple store-gateway instances have some missing blocks but queried from a replica during subsequent attempts": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + {ID: block4}, }, storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. @@ -397,9 +396,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { `, }, "max chunks per query limit greater then the number of chunks fetched": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -422,9 +421,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, "max chunks per query limit hit while fetching chunks at first attempt": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -439,11 +438,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { expectedErr: fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + {ID: block4}, }, storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. @@ -483,7 +482,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} - finder.On("GetBlocks", "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*metadata.DeletionMark(nil), testData.finderErr) + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) q := &blocksStoreQuerier{ ctx: ctx, @@ -568,7 +567,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { ) tests := map[string]struct { - finderResult []*BlockMeta + finderResult bucketindex.Blocks finderErr error storeSetResponses []interface{} expectedLabelNames []string @@ -585,9 +584,9 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { expectedErr: "unable to find blocks", }, "error while getting clients to query the store-gateway": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ errors.New("no client found"), @@ -595,9 +594,9 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { expectedErr: "no client found", }, "a single store-gateway instance holds the required blocks": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -620,9 +619,9 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), }, "multiple store-gateway instances holds the required blocks without overlapping series": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -658,9 +657,9 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), }, "multiple store-gateway instances holds the required blocks with overlapping series (single returned series)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ @@ -696,9 +695,9 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { expectedLabelValues: valuesFromSeries(labels.MetricName, series1), }, "multiple store-gateway instances holds the required blocks with overlapping series (multiple returned series)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, // Block1 has series1 and series2 // Block2 has only series1 @@ -777,9 +776,9 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { `, }, "a single store-gateway instance has some missing blocks (consistency check failed)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, }, storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. @@ -804,11 +803,11 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { expectedErr: fmt.Sprintf("consistency check failed because some blocks were not queried: %s", block2.String()), }, "multiple store-gateway instances have some missing blocks (consistency check failed)": { - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + {ID: block4}, }, storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. @@ -850,11 +849,11 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { // Block2 has series2 // Block3 has series1 and series2 // Block4 has no series (poor lonely block) - finderResult: []*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + {ID: block4}, }, storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. @@ -960,7 +959,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} - finder.On("GetBlocks", "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*metadata.DeletionMark(nil), testData.finderErr) + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) q := &blocksStoreQuerier{ ctx: ctx, @@ -1056,7 +1055,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) for testName, testData := range tests { t.Run(testName, func(t *testing.T) { finder := &blocksFinderMock{} - finder.On("GetBlocks", "user-1", mock.Anything, mock.Anything).Return([]*BlockMeta(nil), map[ulid.ULID]*metadata.DeletionMark(nil), error(nil)) + finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) q := &blocksStoreQuerier{ ctx: context.Background(), @@ -1084,8 +1083,8 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) assert.Len(t, finder.Calls, 0) } else { require.Len(t, finder.Calls, 1) - assert.Equal(t, testData.expectedMinT, finder.Calls[0].Arguments.Get(1)) - assert.InDelta(t, testData.expectedMaxT, finder.Calls[0].Arguments.Get(2), float64(5*time.Second.Milliseconds())) + assert.Equal(t, testData.expectedMinT, finder.Calls[0].Arguments.Get(2)) + assert.InDelta(t, testData.expectedMaxT, finder.Calls[0].Arguments.Get(3), float64(5*time.Second.Milliseconds())) } }) } @@ -1119,10 +1118,10 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { finder := &blocksFinderMock{ Service: services.NewIdleService(nil, nil), } - finder.On("GetBlocks", "user-1", mock.Anything, mock.Anything).Return([]*BlockMeta{ - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, - {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, - }, map[ulid.ULID]*metadata.DeletionMark(nil), error(nil)) + finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) // Mock the store to simulate each block is queried from a different store-gateway. gateway1 := &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ @@ -1243,9 +1242,9 @@ type blocksFinderMock struct { mock.Mock } -func (m *blocksFinderMock) GetBlocks(userID string, minT, maxT int64) ([]*BlockMeta, map[ulid.ULID]*metadata.DeletionMark, error) { - args := m.Called(userID, minT, maxT) - return args.Get(0).([]*BlockMeta), args.Get(1).(map[ulid.ULID]*metadata.DeletionMark), args.Error(2) +func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { + args := m.Called(ctx, userID, minT, maxT) + return args.Get(0).(bucketindex.Blocks), args.Get(1).(map[ulid.ULID]*bucketindex.BlockDeletionMark), args.Error(2) } type storeGatewayClientMock struct { diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go new file mode 100644 index 00000000000..ef6f6317ef8 --- /dev/null +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -0,0 +1,198 @@ +package bucketindex + +import ( + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" +) + +const ( + IndexFilename = "bucket-index.json" + IndexVersion1 = 1 + + SegmentsFormatUnknown = "" + + // SegmentsFormat1Based6Digits defined segments numbered with 6 digits numbers in a sequence starting from number 1 + // eg. (000001, 000002, 000003). + SegmentsFormat1Based6Digits = "1b6d" +) + +// Index contains all known blocks and markers of a tenant. +type Index struct { + // Version of the index format. + Version int `json:"version"` + + // List of complete blocks (partial blocks are excluded from the index). + Blocks []*Block `json:"blocks"` + + // List of block deletion marks. + BlockDeletionMarks []*BlockDeletionMark `json:"block_deletion_marks"` + + // UpdatedAt is a unix timestamp (seconds precision) of when the index has been updated + // (written in the storage) the last time. + UpdatedAt int64 `json:"updated_at"` +} + +// Block holds the information about a block in the index. +type Block struct { + // Block ID. + ID ulid.ULID `json:"block_id"` + + // MinTime and MaxTime specify the time range all samples in the block are in (millis precision). + MinTime int64 `json:"min_time"` + MaxTime int64 `json:"max_time"` + + // SegmentsFormat and SegmentsNum stores the format and number of chunks segments + // in the block, if they match a known pattern. We don't store the full segments + // files list in order to keep the index small. SegmentsFormat is empty if segments + // are unknown or don't match a known format. + SegmentsFormat string `json:"segments_format,omitempty"` + SegmentsNum int `json:"segments_num,omitempty"` + + // UploadedAt is a unix timestamp (seconds precision) of when the block has been completed to be uploaded + // to the storage. + UploadedAt int64 `json:"uploaded_at"` +} + +func (m *Block) GetUploadedAt() time.Time { + return time.Unix(m.UploadedAt, 0) +} + +// ThanosMeta returns a block meta based on the known information in the index. +// The returned meta doesn't include all original meta.json data but only a subset +// of it. +func (m *Block) ThanosMeta(userID string) metadata.Meta { + return metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: m.ID, + MinTime: m.MinTime, + MaxTime: m.MaxTime, + Version: metadata.TSDBVersion1, + }, + Thanos: metadata.Thanos{ + Version: metadata.ThanosVersion1, + Labels: map[string]string{ + cortex_tsdb.TenantIDExternalLabel: userID, + }, + SegmentFiles: m.thanosMetaSegmentFiles(), + }, + } +} + +func (m *Block) thanosMetaSegmentFiles() (files []string) { + if m.SegmentsFormat == SegmentsFormat1Based6Digits { + for i := 1; i <= m.SegmentsNum; i++ { + files = append(files, fmt.Sprintf("%06d", i)) + } + } + + return files +} + +func (m *Block) String() string { + minT := util.TimeFromMillis(m.MinTime).UTC() + maxT := util.TimeFromMillis(m.MaxTime).UTC() + + return fmt.Sprintf("%s (min time: %s max time: %s)", m.ID, minT.String(), maxT.String()) +} + +func BlockFromThanosMeta(meta metadata.Meta) *Block { + segmentsFormat, segmentsNum := detectBlockSegmentsFormat(meta) + + return &Block{ + ID: meta.ULID, + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + SegmentsFormat: segmentsFormat, + SegmentsNum: segmentsNum, + } +} + +func detectBlockSegmentsFormat(meta metadata.Meta) (string, int) { + if num, ok := detectBlockSegmentsFormat1Based6Digits(meta); ok { + return SegmentsFormat1Based6Digits, num + } + + return "", 0 +} + +func detectBlockSegmentsFormat1Based6Digits(meta metadata.Meta) (int, bool) { + // Check the (deprecated) SegmentFiles. + if len(meta.Thanos.SegmentFiles) > 0 { + for i, f := range meta.Thanos.SegmentFiles { + if fmt.Sprintf("%06d", i+1) != f { + return 0, false + } + } + return len(meta.Thanos.SegmentFiles), true + } + + // Check the Files. + if len(meta.Thanos.Files) > 0 { + num := 0 + for _, file := range meta.Thanos.Files { + if !strings.HasPrefix(file.RelPath, block.ChunksDirname+string(filepath.Separator)) { + continue + } + if fmt.Sprintf("%s%s%06d", block.ChunksDirname, string(filepath.Separator), num+1) != file.RelPath { + return 0, false + } + num++ + } + + if num > 0 { + return num, true + } + } + + return 0, false +} + +// BlockDeletionMark holds the information about a block's deletion mark in the index. +type BlockDeletionMark struct { + // Block ID. + ID ulid.ULID `json:"block_id"` + + // DeletionTime is a unix timestamp (seconds precision) of when the block was marked to be deleted. + DeletionTime int64 `json:"deletion_time"` +} + +func BlockDeletionMarkFromThanosMarker(mark *metadata.DeletionMark) *BlockDeletionMark { + return &BlockDeletionMark{ + ID: mark.ID, + DeletionTime: mark.DeletionTime, + } +} + +// Blocks holds a set of blocks in the index. No ordering guaranteed. +type Blocks []*Block + +func (s Blocks) GetULIDs() []ulid.ULID { + ids := make([]ulid.ULID, len(s)) + for i, m := range s { + ids[i] = m.ID + } + return ids +} + +func (s Blocks) String() string { + b := strings.Builder{} + + for idx, m := range s { + if idx > 0 { + b.WriteString(", ") + } + b.WriteString(m.String()) + } + + return b.String() +} diff --git a/pkg/storage/tsdb/bucketindex/index_test.go b/pkg/storage/tsdb/bucketindex/index_test.go new file mode 100644 index 00000000000..7c50cd42620 --- /dev/null +++ b/pkg/storage/tsdb/bucketindex/index_test.go @@ -0,0 +1,265 @@ +package bucketindex + +import ( + "testing" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestDetectBlockSegmentsFormat(t *testing.T) { + tests := map[string]struct { + meta metadata.Meta + expectedFormat string + expectedNum int + }{ + "meta.json without SegmentFiles and Files": { + meta: metadata.Meta{}, + expectedFormat: SegmentsFormatUnknown, + expectedNum: 0, + }, + "meta.json with SegmentFiles, 0 based 6 digits": { + meta: metadata.Meta{ + Thanos: metadata.Thanos{ + SegmentFiles: []string{ + "000000", + "000001", + "000002", + }, + }, + }, + expectedFormat: SegmentsFormatUnknown, + expectedNum: 0, + }, + "meta.json with SegmentFiles, 1 based 6 digits": { + meta: metadata.Meta{ + Thanos: metadata.Thanos{ + SegmentFiles: []string{ + "000001", + "000002", + "000003", + }, + }, + }, + expectedFormat: SegmentsFormat1Based6Digits, + expectedNum: 3, + }, + "meta.json with SegmentFiles, 1 based 6 digits but non consecutive": { + meta: metadata.Meta{ + Thanos: metadata.Thanos{ + SegmentFiles: []string{ + "000001", + "000003", + "000004", + }, + }, + }, + expectedFormat: SegmentsFormatUnknown, + expectedNum: 0, + }, + "meta.json with Files, 0 based 6 digits": { + meta: metadata.Meta{ + Thanos: metadata.Thanos{ + Files: []metadata.File{ + {RelPath: "index"}, + {RelPath: "chunks/000000"}, + {RelPath: "chunks/000001"}, + {RelPath: "chunks/000002"}, + {RelPath: "tombstone"}, + }, + }, + }, + expectedFormat: SegmentsFormatUnknown, + expectedNum: 0, + }, + "meta.json with Files, 1 based 6 digits": { + meta: metadata.Meta{ + Thanos: metadata.Thanos{ + Files: []metadata.File{ + {RelPath: "index"}, + {RelPath: "chunks/000001"}, + {RelPath: "chunks/000002"}, + {RelPath: "chunks/000003"}, + {RelPath: "tombstone"}, + }, + }, + }, + expectedFormat: SegmentsFormat1Based6Digits, + expectedNum: 3, + }, + "meta.json with Files, 1 based 6 digits but non consecutive": { + meta: metadata.Meta{ + Thanos: metadata.Thanos{ + Files: []metadata.File{ + {RelPath: "index"}, + {RelPath: "chunks/000001"}, + {RelPath: "chunks/000003"}, + {RelPath: "chunks/000004"}, + {RelPath: "tombstone"}, + }, + }, + }, + expectedFormat: SegmentsFormatUnknown, + expectedNum: 0, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + actualFormat, actualNum := detectBlockSegmentsFormat(testData.meta) + assert.Equal(t, testData.expectedFormat, actualFormat) + assert.Equal(t, testData.expectedNum, actualNum) + }) + } +} + +func TestBlockFromThanosMeta(t *testing.T) { + blockID := ulid.MustNew(1, nil) + + tests := map[string]struct { + meta metadata.Meta + expected Block + }{ + "meta.json without SegmentFiles and Files": { + meta: metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: blockID, + MinTime: 10, + MaxTime: 20, + }, + Thanos: metadata.Thanos{}, + }, + expected: Block{ + ID: blockID, + MinTime: 10, + MaxTime: 20, + SegmentsFormat: SegmentsFormatUnknown, + SegmentsNum: 0, + }, + }, + "meta.json with SegmentFiles": { + meta: metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: blockID, + MinTime: 10, + MaxTime: 20, + }, + Thanos: metadata.Thanos{ + SegmentFiles: []string{ + "000001", + "000002", + "000003", + }, + }, + }, + expected: Block{ + ID: blockID, + MinTime: 10, + MaxTime: 20, + SegmentsFormat: SegmentsFormat1Based6Digits, + SegmentsNum: 3, + }, + }, + "meta.json with Files": { + meta: metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: blockID, + MinTime: 10, + MaxTime: 20, + }, + Thanos: metadata.Thanos{ + Files: []metadata.File{ + {RelPath: "index"}, + {RelPath: "chunks/000001"}, + {RelPath: "chunks/000002"}, + {RelPath: "chunks/000003"}, + {RelPath: "tombstone"}, + }, + }, + }, + expected: Block{ + ID: blockID, + MinTime: 10, + MaxTime: 20, + SegmentsFormat: SegmentsFormat1Based6Digits, + SegmentsNum: 3, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, *BlockFromThanosMeta(testData.meta)) + }) + } +} + +func TestBlock_ThanosMeta(t *testing.T) { + blockID := ulid.MustNew(1, nil) + userID := "user-1" + + tests := map[string]struct { + block Block + expected metadata.Meta + }{ + "block with segment files format 1 based 6 digits": { + block: Block{ + ID: blockID, + MinTime: 10, + MaxTime: 20, + SegmentsFormat: SegmentsFormat1Based6Digits, + SegmentsNum: 3, + }, + expected: metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: blockID, + MinTime: 10, + MaxTime: 20, + Version: metadata.TSDBVersion1, + }, + Thanos: metadata.Thanos{ + Version: metadata.ThanosVersion1, + Labels: map[string]string{ + "__org_id__": userID, + }, + SegmentFiles: []string{ + "000001", + "000002", + "000003", + }, + }, + }, + }, + "block with unknown segment files format": { + block: Block{ + ID: blockID, + MinTime: 10, + MaxTime: 20, + SegmentsFormat: SegmentsFormatUnknown, + SegmentsNum: 0, + }, + expected: metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: blockID, + MinTime: 10, + MaxTime: 20, + Version: metadata.TSDBVersion1, + }, + Thanos: metadata.Thanos{ + Version: metadata.ThanosVersion1, + Labels: map[string]string{ + "__org_id__": userID, + }, + }, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, testData.block.ThanosMeta(userID)) + }) + } +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go index d6b4180a285..b47c28eba49 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go @@ -198,10 +198,11 @@ func ReadFromDir(dir string) (*Meta, error) { if err != nil { return nil, err } - return read(f) + return Read(f) } -func read(rc io.ReadCloser) (_ *Meta, err error) { +// Read the block meta from the given reader. +func Read(rc io.ReadCloser) (_ *Meta, err error) { defer runutil.ExhaustCloseWithErrCapture(&err, rc, "close meta JSON") var m Meta diff --git a/vendor/github.com/thanos-io/thanos/pkg/objstore/objstore.go b/vendor/github.com/thanos-io/thanos/pkg/objstore/objstore.go index cdab0925a6c..cf5317d43d7 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/objstore/objstore.go +++ b/vendor/github.com/thanos-io/thanos/pkg/objstore/objstore.go @@ -118,7 +118,7 @@ func TryToGetSize(r io.Reader) (int64, error) { case *strings.Reader: return f.Size(), nil } - return 0, errors.New("unsupported type of io.Reader") + return 0, errors.Errorf("unsupported type of io.Reader: %T", r) } // UploadDir uploads all files in srcdir to the bucket with into a top-level directory diff --git a/vendor/modules.txt b/vendor/modules.txt index d64807aab8a..5cae7f1f4c9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -566,7 +566,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/thanos v0.13.1-0.20201125124348-2008ef009e88 +# github.com/thanos-io/thanos v0.13.1-0.20201130180807-84afc97e7d58 ## explicit github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader