diff --git a/CHANGELOG.md b/CHANGELOG.md index 294c6a2d46a..0497f2d1c2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,15 +6,16 @@ * [CHANGE] Blocks storage: compactor is now required when running a Cortex cluster with the blocks storage, because it also keeps the bucket index updated. #3583 * [CHANGE] Blocks storage: block deletion marks are now stored in a per-tenant global markers/ location too, other than within the block location. The compactor, at startup, will copy deletion marks from the block location to the global location. This migration is required only once, so you can safely disable it via `-compactor.block-deletion-marks-migration-enabled=false` once new compactor has successfully started once in your cluster. #3583 * [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250 -* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers and store-gateways. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 -* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier. When enabled, the querier will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics have been added: #3614 +* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers and store-gateways. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 +* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier and store-gateway. When enabled, the querier and store-gateway will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier: #3614 #3625 * `cortex_bucket_index_loads_total` * `cortex_bucket_index_load_failures_total` * `cortex_bucket_index_load_duration_seconds` * `cortex_bucket_index_loaded` -* [ENHANCEMENT] Compactor: exported the following metrics. #3583 - * `cortex_bucket_blocks_count`: Total number of blocks per tenant in the bucket. Includes blocks marked for deletion. +* [ENHANCEMENT] Compactor: exported the following metrics. #3583 #3625 + * `cortex_bucket_blocks_count`: Total number of blocks per tenant in the bucket. Includes blocks marked for deletion, but not partial blocks. * `cortex_bucket_blocks_marked_for_deletion_count`: Total number of blocks per tenant marked for deletion in the bucket. + * `cortex_bucket_blocks_partials_count`: Total number of partial blocks. * `cortex_bucket_index_last_successful_update_timestamp_seconds`: Timestamp of the last successful update of a tenant's bucket index. * [ENHANCEMENT] Ruler: Add `cortex_prometheus_last_evaluation_samples` to expose the number of samples generated by a rule group per tenant. #3582 * [ENHANCEMENT] Memberlist: add status page (/memberlist) with available details about memberlist-based KV store and memberlist cluster. It's also possible to view KV values in Go struct or JSON format, or download for inspection. #3575 diff --git a/docs/blocks-storage/bucket-index.md b/docs/blocks-storage/bucket-index.md index ce006e939ed..e825a9f8ab0 100644 --- a/docs/blocks-storage/bucket-index.md +++ b/docs/blocks-storage/bucket-index.md @@ -5,18 +5,18 @@ weight: 5 slug: bucket-index --- -The bucket index is a **per-tenant file containing the list of blocks and block deletion marks** in the storage. The bucket index itself is stored in the backend object storage, is periodically updated by the compactor and used by queriers to discover blocks in the storage. +The bucket index is a **per-tenant file containing the list of blocks and block deletion marks** in the storage. The bucket index itself is stored in the backend object storage, is periodically updated by the compactor, and used by queriers and store-gateways to discover blocks in the storage. The bucket index usage is **optional** and can be enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true` (or its respective YAML config option). ## Benefits -The [querier](./querier.md) needs to have an almost up-to-date view over the entire storage bucket, in order to find the right blocks to lookup at query time. Because of this, querier needs to periodically scan the bucket to look for new blocks uploaded by ingester or compactor, and blocks deleted (or marked for deletion) by compactor. +The [querier](./querier.md) and [store-gateway](./store-gateway.md) need to have an almost up-to-date view over the entire storage bucket, in order to find the right blocks to lookup at query time (querier) and load block's [index-header](./binary-index-header.md) (store-gateway). Because of this, they need to periodically scan the bucket to look for new blocks uploaded by ingester or compactor, and blocks deleted (or marked for deletion) by compactor. -When this bucket index is enabled, the querier periodically look up the per-tenant bucket index instead of scanning the bucket via "list objects" operations. This brings few benefits: +When the bucket index is enabled, the querier and store-gateway periodically look up the per-tenant bucket index instead of scanning the bucket via "list objects" operations. This brings few benefits: -1. Reduced number of API calls to the object storage by querier -2. No "list objects" storage API calls done by querier +1. Reduced number of API calls to the object storage by querier and store-gateway +2. No "list objects" storage API calls done by querier and store-gateway 3. The [querier](./querier.md) is up and running immediately after the startup (no need to run an initial bucket scan) ## Structure of the index @@ -42,7 +42,7 @@ The [querier](./querier.md), at query time, checks whether the bucket index for _Given it's a small file, lazy downloading it doesn't significantly impact on first query performances, but allows to get a querier up and running without pre-downloading every tenant's bucket index. Moreover, if the [metadata cache](./querier.md#metadata-cache) is enabled, the bucket index will be cached for a short time in a shared cache, reducing the actual latency and number of API calls to the object storage in case multiple queriers will fetch the same tenant's bucket index in a short time._ -![Querier - Bucket index](/images/blocks-storage/bucket-index-querier-logic.png) +![Querier - Bucket index](/images/blocks-storage/bucket-index-querier-workflow.png) While in-memory, a background process will keep it **updated at periodic intervals**, so that subsequent queries from the same tenant to the same querier instance will use the cached (and periodically updated) bucket index. There are two config options involved: @@ -55,3 +55,7 @@ While in-memory, a background process will keep it **updated at periodic interva If a bucket index is unused for a long time (configurable via `-blocks-storage.bucket-store.bucket-index.idle-timeout`), e.g. because that querier instance is not receiving any query from the tenant, the querier will offload it, stopping to keep it updated at regular intervals. This is particularly for tenants which are resharded to different queriers when [shuffle sharding](../guides/shuffle-sharding.md) is enabled. Finally, the querier, at query time, checks how old is a bucket index (based on its `updated_at`) and fail a query if its age is older than `-blocks-storage.bucket-store.bucket-index.max-stale-period`. This circuit breaker is used to ensure queriers will not return any partial query results due to a stale view over the long-term storage. + +## How it's used by the store-gateway + +The [store-gateway](./store-gateway.md), at startup and periodically, fetches the bucket index for each tenant belonging to their shard and uses it as the source of truth for the blocks (and deletion marks) in the storage. This removes the need to periodically scan the bucket to discover blocks belonging to their shard. diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index b83e26b4cbf..b4f641496ca 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -10,7 +10,7 @@ slug: compactor The **compactor** is an service which is responsible to: - Compact multiple blocks of a given tenant into a single optimized larger block. This helps to reduce storage costs (deduplication, index size reduction), and increase query speed (querying fewer blocks is faster). -- Keep the per-tenant bucket index updated. The [bucket index](./bucket-index.md) is used by [queriers](./querier.md) to discover new blocks in the storage. +- Keep the per-tenant bucket index updated. The [bucket index](./bucket-index.md) is used by [queriers](./querier.md) and [store-gateways](./store-gateway.md) to discover new blocks in the storage. The compactor is **stateless**. diff --git a/docs/blocks-storage/compactor.template b/docs/blocks-storage/compactor.template index 7fd64e60edc..8870afaaa83 100644 --- a/docs/blocks-storage/compactor.template +++ b/docs/blocks-storage/compactor.template @@ -10,7 +10,7 @@ slug: compactor The **compactor** is an service which is responsible to: - Compact multiple blocks of a given tenant into a single optimized larger block. This helps to reduce storage costs (deduplication, index size reduction), and increase query speed (querying fewer blocks is faster). -- Keep the per-tenant bucket index updated. The [bucket index](./bucket-index.md) is used by [queriers](./querier.md) to discover new blocks in the storage. +- Keep the per-tenant bucket index updated. The [bucket index](./bucket-index.md) is used by [queriers](./querier.md) and [store-gateways](./store-gateway.md) to discover new blocks in the storage. The compactor is **stateless**. diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 13895f0fc3b..be60cb051e9 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -365,8 +365,9 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.sync-dir [sync_dir: | default = "tsdb-sync"] - # How frequently scan the bucket to look for changes (new blocks shipped by - # ingesters and blocks removed by retention or compaction). 0 disables it. + # How frequently scan the bucket - or fetch the bucket index (if enabled) - + # to look for changes (new blocks shipped by ingesters and blocks removed by + # retention or compaction). 0 disables it. # CLI flag: -blocks-storage.bucket-store.sync-interval [sync_interval: | default = 5m] @@ -634,22 +635,24 @@ blocks_storage: [ignore_deletion_mark_delay: | default = 6h] bucket_index: - # True to enable querier to discover blocks in the storage via bucket - # index instead of bucket scanning. + # True to enable querier and store-gateway to discover blocks in the + # storage via bucket index instead of bucket scanning. # CLI flag: -blocks-storage.bucket-store.bucket-index.enabled [enabled: | default = false] - # How frequently a cached bucket index should be refreshed. + # How frequently a cached bucket index should be refreshed. This option is + # used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval [update_on_stale_interval: | default = 15m] # How frequently a bucket index, which previously failed to load, should - # be tried to load again. + # be tried to load again. This option is used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval [update_on_error_interval: | default = 1m] # How long a unused bucket index should be cached. Once this timeout # expires, the unused bucket index is removed from the in-memory cache. + # This option is used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout [idle_timeout: | default = 1h] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 821b9134484..709285619ef 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -13,6 +13,13 @@ The store-gateway is **semi-stateful**. ## How it works +The store-gateway needs to have an almost up-to-date view over the storage bucket, in order to discover blocks belonging to their shard. The store-gateway can keep the bucket view updated in to two different ways: + +1. Periodically scanning the bucket (default) +2. Periodically downloading the [bucket index](./bucket-index.md) + +### Bucket index disabled (default) + At startup **store-gateways** iterate over the entire storage bucket to discover blocks for all tenants and download the `meta.json` and index-header for each block. During this initial bucket synchronization phase, the store-gateway `/ready` readiness probe endpoint will fail. While running, store-gateways periodically rescan the storage bucket to discover new blocks (uploaded by the ingesters and [compactor](./compactor.md)) and blocks marked for deletion or fully deleted since the last scan (as a result of compaction). The frequency at which this occurs is configured via `-blocks-storage.bucket-store.sync-interval`. @@ -21,6 +28,12 @@ The blocks chunks and the entire index are never fully downloaded by the store-g _For more information about the index-header, please refer to [Binary index-header documentation](./binary-index-header.md)._ +### Bucket index enabled + +When bucket index is enabled, the overall workflow is the same but, instead of iterating over the bucket objects, the store-gateway fetch the [bucket index](./bucket-index.md) for each tenant belonging to their shard in order to discover each tenant's blocks and block deletion marks. + +_For more information about the bucket index, please refer to [bucket index documentation](./bucket-index.md)._ + ## Blocks sharding and replication The store-gateway optionally supports blocks sharding. Sharding can be used to horizontally scale blocks in a large cluster without hitting any vertical scalability limit. @@ -399,8 +412,9 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.sync-dir [sync_dir: | default = "tsdb-sync"] - # How frequently scan the bucket to look for changes (new blocks shipped by - # ingesters and blocks removed by retention or compaction). 0 disables it. + # How frequently scan the bucket - or fetch the bucket index (if enabled) - + # to look for changes (new blocks shipped by ingesters and blocks removed by + # retention or compaction). 0 disables it. # CLI flag: -blocks-storage.bucket-store.sync-interval [sync_interval: | default = 5m] @@ -668,22 +682,24 @@ blocks_storage: [ignore_deletion_mark_delay: | default = 6h] bucket_index: - # True to enable querier to discover blocks in the storage via bucket - # index instead of bucket scanning. + # True to enable querier and store-gateway to discover blocks in the + # storage via bucket index instead of bucket scanning. # CLI flag: -blocks-storage.bucket-store.bucket-index.enabled [enabled: | default = false] - # How frequently a cached bucket index should be refreshed. + # How frequently a cached bucket index should be refreshed. This option is + # used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval [update_on_stale_interval: | default = 15m] # How frequently a bucket index, which previously failed to load, should - # be tried to load again. + # be tried to load again. This option is used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval [update_on_error_interval: | default = 1m] # How long a unused bucket index should be cached. Once this timeout # expires, the unused bucket index is removed from the in-memory cache. + # This option is used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout [idle_timeout: | default = 1h] diff --git a/docs/blocks-storage/store-gateway.template b/docs/blocks-storage/store-gateway.template index 49526b55820..5f4344809c1 100644 --- a/docs/blocks-storage/store-gateway.template +++ b/docs/blocks-storage/store-gateway.template @@ -13,6 +13,13 @@ The store-gateway is **semi-stateful**. ## How it works +The store-gateway needs to have an almost up-to-date view over the storage bucket, in order to discover blocks belonging to their shard. The store-gateway can keep the bucket view updated in to two different ways: + +1. Periodically scanning the bucket (default) +2. Periodically downloading the [bucket index](./bucket-index.md) + +### Bucket index disabled (default) + At startup **store-gateways** iterate over the entire storage bucket to discover blocks for all tenants and download the `meta.json` and index-header for each block. During this initial bucket synchronization phase, the store-gateway `/ready` readiness probe endpoint will fail. While running, store-gateways periodically rescan the storage bucket to discover new blocks (uploaded by the ingesters and [compactor](./compactor.md)) and blocks marked for deletion or fully deleted since the last scan (as a result of compaction). The frequency at which this occurs is configured via `-blocks-storage.bucket-store.sync-interval`. @@ -21,6 +28,12 @@ The blocks chunks and the entire index are never fully downloaded by the store-g _For more information about the index-header, please refer to [Binary index-header documentation](./binary-index-header.md)._ +### Bucket index enabled + +When bucket index is enabled, the overall workflow is the same but, instead of iterating over the bucket objects, the store-gateway fetch the [bucket index](./bucket-index.md) for each tenant belonging to their shard in order to discover each tenant's blocks and block deletion marks. + +_For more information about the bucket index, please refer to [bucket index documentation](./bucket-index.md)._ + ## Blocks sharding and replication The store-gateway optionally supports blocks sharding. Sharding can be used to horizontally scale blocks in a large cluster without hitting any vertical scalability limit. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a3b54d92d37..3f4915597be 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3682,8 +3682,9 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.sync-dir [sync_dir: | default = "tsdb-sync"] - # How frequently scan the bucket to look for changes (new blocks shipped by - # ingesters and blocks removed by retention or compaction). 0 disables it. + # How frequently scan the bucket - or fetch the bucket index (if enabled) - to + # look for changes (new blocks shipped by ingesters and blocks removed by + # retention or compaction). 0 disables it. # CLI flag: -blocks-storage.bucket-store.sync-interval [sync_interval: | default = 5m] @@ -3950,22 +3951,24 @@ bucket_store: [ignore_deletion_mark_delay: | default = 6h] bucket_index: - # True to enable querier to discover blocks in the storage via bucket index - # instead of bucket scanning. + # True to enable querier and store-gateway to discover blocks in the storage + # via bucket index instead of bucket scanning. # CLI flag: -blocks-storage.bucket-store.bucket-index.enabled [enabled: | default = false] - # How frequently a cached bucket index should be refreshed. + # How frequently a cached bucket index should be refreshed. This option is + # used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval [update_on_stale_interval: | default = 15m] # How frequently a bucket index, which previously failed to load, should be - # tried to load again. + # tried to load again. This option is used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval [update_on_error_interval: | default = 1m] # How long a unused bucket index should be cached. Once this timeout - # expires, the unused bucket index is removed from the in-memory cache. + # expires, the unused bucket index is removed from the in-memory cache. This + # option is used only by querier. # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout [idle_timeout: | default = 1h] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 1458dd05d4d..99e03bd454a 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -64,6 +64,6 @@ Currently experimental features are: - Tenant Deletion in Purger, for blocks storage. - Query-frontend: query stats tracking (`-frontend.query-stats-enabled`) - Blocks storage bucket index - - The bucket index support in the querier (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental + - The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental - The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions - Querier: tenant federation diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index b419a2d08bf..fbfc0acf99d 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -50,6 +50,7 @@ type BlocksCleaner struct { blocksFailedTotal prometheus.Counter tenantBlocks *prometheus.GaugeVec tenantMarkedBlocks *prometheus.GaugeVec + tenantPartialBlocks *prometheus.GaugeVec tenantBucketIndexLastUpdate *prometheus.GaugeVec } @@ -89,12 +90,16 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use // metrics can be tracked. tenantBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_bucket_blocks_count", - Help: "Total number of blocks in the bucket. Includes blocks marked for deletion.", + Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.", }, []string{"user"}), tenantMarkedBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_bucket_blocks_marked_for_deletion_count", Help: "Total number of blocks marked for deletion in the bucket.", }, []string{"user"}), + tenantPartialBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_bucket_blocks_partials_count", + Help: "Total number of partial blocks.", + }, []string{"user"}), tenantBucketIndexLastUpdate: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_bucket_index_last_successful_update_timestamp_seconds", Help: "Timestamp of the last successful update of a tenant's bucket index.", @@ -154,6 +159,7 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error { if !isActive[userID] && !isDeleted[userID] { c.tenantBlocks.DeleteLabelValues(userID) c.tenantMarkedBlocks.DeleteLabelValues(userID) + c.tenantPartialBlocks.DeleteLabelValues(userID) c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) } } @@ -216,6 +222,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID // to delete them again. c.tenantBlocks.WithLabelValues(userID).Set(float64(failed)) c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(failed)) + c.tenantPartialBlocks.WithLabelValues(userID).Set(0) return errors.Errorf("failed to delete %d blocks", failed) } @@ -223,6 +230,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID // Given all blocks have been deleted, we can also remove the metrics. c.tenantBlocks.DeleteLabelValues(userID) c.tenantMarkedBlocks.DeleteLabelValues(userID) + c.tenantPartialBlocks.DeleteLabelValues(userID) if deletedBlocks > 0 { level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks) @@ -339,11 +347,14 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) + c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() return nil } +// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map +// is updated accordingly. func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket *bucket.UserBucketClient, userLogger log.Logger) { for blockID, blockErr := range partials { // We can safely delete only blocks which are partial because the meta.json is missing. @@ -353,7 +364,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map // We can safely delete only partial blocks with a deletion mark. err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{}) - if err == metadata.ErrorMarkerNotFound { + if errors.Is(err, metadata.ErrorMarkerNotFound) { continue } if err != nil { @@ -371,6 +382,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map // Remove the block from the bucket index too. idx.RemoveBlock(blockID) + delete(partials, blockID) c.blocksCleanedTotal.Inc() level.Info(userLogger).Log("msg", "deleted partial block marked for deletion", "block", blockID) diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 87c7fac1524..44e8af0be37 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -5,8 +5,6 @@ import ( "crypto/rand" "errors" "fmt" - "io/ioutil" - "os" "path" "strings" "testing" @@ -23,13 +21,25 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" - "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) +type testBlocksCleanerOptions struct { + concurrency int + markersMigrationEnabled bool + tenantDeletionDelay time.Duration + user4FilesExist bool // User 4 has "FinishedTime" in tenant deletion marker set to "1h" ago. +} + +func (o testBlocksCleanerOptions) String() string { + return fmt.Sprintf("concurrency=%d, markers migration enabled=%v, tenant deletion delay=%v", + o.concurrency, o.markersMigrationEnabled, o.tenantDeletionDelay) +} + func TestBlocksCleaner(t *testing.T) { for _, options := range []testBlocksCleanerOptions{ {concurrency: 1, tenantDeletionDelay: 0, user4FilesExist: false}, @@ -47,20 +57,8 @@ func TestBlocksCleaner(t *testing.T) { } } -type testBlocksCleanerOptions struct { - concurrency int - markersMigrationEnabled bool - tenantDeletionDelay time.Duration - user4FilesExist bool // User 4 has "FinishedTime" in tenant deletion marker set to "1h" ago. -} - -func (o testBlocksCleanerOptions) String() string { - return fmt.Sprintf("concurrency=%d, markers migration enabled=%v, tenant deletion delay=%v", - o.concurrency, o.markersMigrationEnabled, o.tenantDeletionDelay) -} - func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) { - bucketClient := prepareFilesystemBucket(t) + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) // If the markers migration is enabled, then we create the fixture blocks without // writing the deletion marks in the global location, because they will be migrated @@ -199,7 +197,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions } assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion. + # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks. # TYPE cortex_bucket_blocks_count gauge cortex_bucket_blocks_count{user="user-1"} 2 cortex_bucket_blocks_count{user="user-2"} 1 @@ -207,16 +205,21 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions # TYPE cortex_bucket_blocks_marked_for_deletion_count gauge cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1 cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 + # HELP cortex_bucket_blocks_partials_count Total number of partial blocks. + # TYPE cortex_bucket_blocks_partials_count gauge + cortex_bucket_blocks_partials_count{user="user-1"} 2 + cortex_bucket_blocks_partials_count{user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", + "cortex_bucket_blocks_partials_count", )) } func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { const userID = "user-1" - bucketClient := prepareFilesystemBucket(t) + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) // Create blocks. @@ -280,7 +283,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { const userID = "user-1" - bucketClient := prepareFilesystemBucket(t) + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) // Create blocks. @@ -336,7 +339,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { } func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) { - bucketClient := prepareFilesystemBucket(t) + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) // Create blocks. @@ -359,7 +362,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar require.NoError(t, cleaner.cleanUsers(ctx, true)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion. + # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks. # TYPE cortex_bucket_blocks_count gauge cortex_bucket_blocks_count{user="user-1"} 2 cortex_bucket_blocks_count{user="user-2"} 1 @@ -367,9 +370,14 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar # TYPE cortex_bucket_blocks_marked_for_deletion_count gauge cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 0 cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 + # HELP cortex_bucket_blocks_partials_count Total number of partial blocks. + # TYPE cortex_bucket_blocks_partials_count gauge + cortex_bucket_blocks_partials_count{user="user-1"} 0 + cortex_bucket_blocks_partials_count{user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", + "cortex_bucket_blocks_partials_count", )) // Override the users scanner to reconfigure it to only return a subset of users. @@ -382,15 +390,19 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar require.NoError(t, cleaner.cleanUsers(ctx, false)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion. + # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks. # TYPE cortex_bucket_blocks_count gauge cortex_bucket_blocks_count{user="user-1"} 3 # HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket. # TYPE cortex_bucket_blocks_marked_for_deletion_count gauge cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 0 + # HELP cortex_bucket_blocks_partials_count Total number of partial blocks. + # TYPE cortex_bucket_blocks_partials_count gauge + cortex_bucket_blocks_partials_count{user="user-1"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", + "cortex_bucket_blocks_partials_count", )) } @@ -406,18 +418,3 @@ func (m *mockBucketFailure) Delete(ctx context.Context, name string) error { } return m.Bucket.Delete(ctx, name) } - -func prepareFilesystemBucket(t *testing.T) objstore.Bucket { - // Create a temporary directory for local storage. - storageDir, err := ioutil.TempDir(os.TempDir(), "storage") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(storageDir)) - }) - - // Create a bucket client on the local storage. - bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) - require.NoError(t, err) - - return bucketClient -} diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 2de9804a5cb..0f29d1c6d63 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -21,7 +22,7 @@ func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) { const userID = "user-1" ctx := context.Background() - bkt, _ := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Mock a bucket index. block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 15} @@ -121,7 +122,7 @@ func BenchmarkBucketIndexBlocksFinder_GetBlocks(b *testing.B) { ) ctx := context.Background() - bkt, _ := prepareFilesystemBucket(b) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(b) // Mock a bucket index. idx := &bucketindex.Index{ @@ -156,7 +157,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T) const userID = "user-1" ctx := context.Background() - bkt, _ := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) finder := prepareBucketIndexBlocksFinder(t, bkt) blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, 10, 20) @@ -169,7 +170,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T) const userID = "user-1" ctx := context.Background() - bkt, _ := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) finder := prepareBucketIndexBlocksFinder(t, bkt) // Upload a corrupted bucket index. @@ -183,7 +184,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) { const userID = "user-1" ctx := context.Background() - bkt, _ := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) finder := prepareBucketIndexBlocksFinder(t, bkt) require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, &bucketindex.Index{ diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index f75dc974e31..9573879f193 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -329,7 +329,7 @@ func (d *BucketScanBlocksFinder) scanUserBlocks(ctx context.Context, userID stri // The blocks scanner expects all blocks to be sorted by max time. sortBlocksByMaxTime(res) - // Convert deletion marks to our onw data type. + // Convert deletion marks to our own data type. marks := map[ulid.ULID]*bucketindex.BlockDeletionMark{} for id, m := range deletionMarkFilter.DeletionMarkBlocks() { marks[id] = bucketindex.BlockDeletionMarkFromThanosMarker(m) diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index a09f37fd143..e22e6cb9e4a 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -22,7 +22,6 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket" - "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" @@ -488,7 +487,7 @@ func prepareBucketScanBlocksFinder(t *testing.T, cfg BucketScanBlocksFinderConfi require.NoError(t, os.RemoveAll(cacheDir)) }) - bkt, storageDir := prepareFilesystemBucket(t) + bkt, storageDir := cortex_testutil.PrepareFilesystemBucket(t) reg := prometheus.NewPedanticRegistry() cfg.CacheDir = cacheDir @@ -510,17 +509,3 @@ func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig { IgnoreDeletionMarksDelay: time.Hour, } } - -func prepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { - storageDir, err := ioutil.TempDir(os.TempDir(), "bucket") - require.NoError(t, err) - - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(storageDir)) - }) - - bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) - require.NoError(t, err) - - return bkt, storageDir -} diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 6ca81430603..5c5f6cb5d4b 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -99,8 +99,8 @@ func (m *Block) GetUploadedAt() time.Time { // ThanosMeta returns a block meta based on the known information in the index. // The returned meta doesn't include all original meta.json data but only a subset // of it. -func (m *Block) ThanosMeta(userID string) metadata.Meta { - return metadata.Meta{ +func (m *Block) ThanosMeta(userID string) *metadata.Meta { + return &metadata.Meta{ BlockMeta: tsdb.BlockMeta{ ULID: m.ID, MinTime: m.MinTime, @@ -199,6 +199,15 @@ func (m *BlockDeletionMark) GetDeletionTime() time.Time { return time.Unix(m.DeletionTime, 0) } +// ThanosMeta returns the Thanos deletion mark. +func (m *BlockDeletionMark) ThanosDeletionMark() *metadata.DeletionMark { + return &metadata.DeletionMark{ + ID: m.ID, + Version: metadata.DeletionMarkVersion1, + DeletionTime: m.DeletionTime, + } +} + func BlockDeletionMarkFromThanosMarker(mark *metadata.DeletionMark) *BlockDeletionMark { return &BlockDeletionMark{ ID: mark.ID, diff --git a/pkg/storage/tsdb/bucketindex/index_test.go b/pkg/storage/tsdb/bucketindex/index_test.go index 4564e8859ca..7aad7de7cce 100644 --- a/pkg/storage/tsdb/bucketindex/index_test.go +++ b/pkg/storage/tsdb/bucketindex/index_test.go @@ -261,7 +261,7 @@ func TestBlock_ThanosMeta(t *testing.T) { tests := map[string]struct { block Block - expected metadata.Meta + expected *metadata.Meta }{ "block with segment files format 1 based 6 digits": { block: Block{ @@ -271,7 +271,7 @@ func TestBlock_ThanosMeta(t *testing.T) { SegmentsFormat: SegmentsFormat1Based6Digits, SegmentsNum: 3, }, - expected: metadata.Meta{ + expected: &metadata.Meta{ BlockMeta: tsdb.BlockMeta{ ULID: blockID, MinTime: 10, @@ -299,7 +299,7 @@ func TestBlock_ThanosMeta(t *testing.T) { SegmentsFormat: SegmentsFormatUnknown, SegmentsNum: 0, }, - expected: metadata.Meta{ + expected: &metadata.Meta{ BlockMeta: tsdb.BlockMeta{ ULID: blockID, MinTime: 10, @@ -323,6 +323,17 @@ func TestBlock_ThanosMeta(t *testing.T) { } } +func TestBlockDeletionMark_ThanosDeletionMark(t *testing.T) { + block1 := ulid.MustNew(1, nil) + mark := &BlockDeletionMark{ID: block1, DeletionTime: 1} + + assert.Equal(t, &metadata.DeletionMark{ + ID: block1, + Version: metadata.DeletionMarkVersion1, + DeletionTime: 1, + }, mark.ThanosDeletionMark()) +} + func TestBlockDeletionMarks_Clone(t *testing.T) { block1 := ulid.MustNew(1, nil) block2 := ulid.MustNew(2, nil) diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index 2aff2184dd2..e4a0c2b1f5d 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -111,7 +111,9 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { l.cacheIndex(userID, nil, err) l.loadFailures.Inc() - if !errors.Is(err, ErrIndexNotFound) { + if errors.Is(err, ErrIndexNotFound) { + level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) + } else { level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err) } diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index 96490609086..15d8d45686a 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" ) @@ -22,7 +23,7 @@ import ( func TestLoader_GetIndex_ShouldLazyLoadBucketIndex(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Create a bucket index. idx := &Index{ @@ -87,7 +88,7 @@ func TestLoader_GetIndex_ShouldLazyLoadBucketIndex(t *testing.T) { func TestLoader_GetIndex_ShouldCacheError(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Create the loader. loader := NewLoader(prepareLoaderConfig(), bkt, log.NewNopLogger(), reg) @@ -123,7 +124,7 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) { func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Create a bucket index. idx := &Index{ @@ -188,7 +189,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Create the loader. cfg := LoaderConfig{ @@ -241,7 +242,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Create a bucket index. idx := &Index{ @@ -297,7 +298,7 @@ func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) { func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Create a bucket index. idx := &Index{ @@ -352,7 +353,7 @@ func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T func TestLoader_ShouldOffloadIndexIfIdleTimeoutIsReachedDuringBackgroundUpdates(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Create a bucket index. idx := &Index{ diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go index e22450c0132..4cdaba52a8e 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go @@ -8,10 +8,12 @@ import ( "github.com/oklog/ulid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) func TestGlobalMarkersBucket_Delete_ShouldSucceedIfDeletionMarkDoesNotExistInTheBlockButExistInTheGlobalLocation(t *testing.T) { - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) ctx := context.Background() bkt = BucketWithGlobalMarkers(bkt) diff --git a/pkg/storage/tsdb/bucketindex/markers_test.go b/pkg/storage/tsdb/bucketindex/markers_test.go index 2425c53b459..860c0d03f15 100644 --- a/pkg/storage/tsdb/bucketindex/markers_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" + + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) func TestBlockDeletionMarkFilepath(t *testing.T) { @@ -36,7 +38,7 @@ func TestIsBlockDeletionMarkFilename(t *testing.T) { } func TestMigrateBlockDeletionMarksToGlobalLocation(t *testing.T) { - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) ctx := context.Background() // Create some fixtures. diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index bcd698e8aa8..97953c960ad 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -21,12 +21,12 @@ var ( // ReadIndex reads, parses and returns a bucket index from the bucket. func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, logger log.Logger) (*Index, error) { - bkt = bucket.NewUserBucketClient(userID, bkt) + userBkt := bucket.NewUserBucketClient(userID, bkt) // Get the bucket index. - reader, err := bkt.Get(ctx, IndexCompressedFilename) + reader, err := userBkt.WithExpectedErrs(userBkt.IsObjNotFoundErr).Get(ctx, IndexCompressedFilename) if err != nil { - if bkt.IsObjNotFoundErr(err) { + if userBkt.IsObjNotFoundErr(err) { return nil, ErrIndexNotFound } return nil, errors.Wrap(err, "read bucket index") diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index 521d25ae7bb..c76e6980740 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -11,10 +11,11 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) func TestReadIndex_ShouldReturnErrorIfIndexDoesNotExist(t *testing.T) { - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) idx, err := ReadIndex(context.Background(), bkt, "user-1", log.NewNopLogger()) require.Equal(t, ErrIndexNotFound, err) @@ -25,7 +26,7 @@ func TestReadIndex_ShouldReturnErrorIfIndexIsCorrupted(t *testing.T) { const userID = "user-1" ctx := context.Background() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Write a corrupted index. require.NoError(t, bkt.Upload(ctx, path.Join(userID, IndexCompressedFilename), strings.NewReader("invalid!}"))) @@ -41,7 +42,7 @@ func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { ctx := context.Background() logger := log.NewNopLogger() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Mock some blocks in the storage. bkt = BucketWithGlobalMarkers(bkt) @@ -71,7 +72,7 @@ func BenchmarkReadIndex(b *testing.B) { ctx := context.Background() logger := log.NewNopLogger() - bkt := prepareFilesystemBucket(b) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(b) // Mock some blocks and deletion marks in the storage. bkt = BucketWithGlobalMarkers(bkt) @@ -108,7 +109,7 @@ func BenchmarkReadIndex(b *testing.B) { func TestDeleteIndex_ShouldNotReturnErrorIfIndexDoesNotExist(t *testing.T) { ctx := context.Background() - bkt := prepareFilesystemBucket(t) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) assert.NoError(t, DeleteIndex(ctx, bkt, "user-1")) } diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 19c0e183646..416657d2598 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -212,6 +212,9 @@ func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid m := metadata.DeletionMark{} if err := metadata.ReadMarker(ctx, w.logger, w.bkt, id.String(), &m); err != nil { + if errors.Is(err, metadata.ErrorMarkerNotFound) { + return nil, errors.Wrap(ErrBlockDeletionMarkNotFound, err.Error()) + } if errors.Is(err, metadata.ErrorUnmarshalMarker) { return nil, errors.Wrap(ErrBlockDeletionMarkCorrupted, err.Error()) } diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index 670a7c544de..e116af272f0 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -3,8 +3,6 @@ package bucketindex import ( "bytes" "context" - "io/ioutil" - "os" "path" "testing" "time" @@ -20,14 +18,13 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket" - "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) func TestUpdater_UpdateIndex(t *testing.T) { const userID = "user-1" - bkt := prepareFilesystemBucket(t) + bkt, _ := testutil.PrepareFilesystemBucket(t) ctx := context.Background() logger := log.NewNopLogger() @@ -69,7 +66,7 @@ func TestUpdater_UpdateIndex(t *testing.T) { func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { const userID = "user-1" - bkt := prepareFilesystemBucket(t) + bkt, _ := testutil.PrepareFilesystemBucket(t) ctx := context.Background() logger := log.NewNopLogger() @@ -98,7 +95,7 @@ func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { const userID = "user-1" - bkt := prepareFilesystemBucket(t) + bkt, _ := testutil.PrepareFilesystemBucket(t) ctx := context.Background() logger := log.NewNopLogger() @@ -127,7 +124,7 @@ func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { func TestUpdater_UpdateIndex_ShouldSkipCorruptedDeletionMarks(t *testing.T) { const userID = "user-1" - bkt := prepareFilesystemBucket(t) + bkt, _ := testutil.PrepareFilesystemBucket(t) ctx := context.Background() logger := log.NewNopLogger() @@ -155,7 +152,7 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { const userID = "user-1" ctx := context.Background() - bkt := prepareFilesystemBucket(t) + bkt, _ := testutil.PrepareFilesystemBucket(t) for _, oldIdx := range []*Index{nil, {}} { w := NewUpdater(bkt, userID, log.NewNopLogger()) @@ -170,20 +167,6 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { } } -func prepareFilesystemBucket(t testing.TB) objstore.Bucket { - storageDir, err := ioutil.TempDir(os.TempDir(), "") - require.NoError(t, err) - - bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) - require.NoError(t, err) - - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(storageDir)) - }) - - return objstore.BucketWithMetrics("test", bkt, nil) -} - func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockID ulid.ULID) int64 { metaFile := path.Join(userID, blockID.String(), block.MetaFilename) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 2f721f28e8e..2dbb3a2aece 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -243,7 +243,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.") f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.") - f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") + f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket - or fetch the bucket index (if enabled) - to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") f.Uint64Var(&cfg.MaxChunkPoolBytes, "blocks-storage.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size - in bytes - of a per-tenant chunk pool, used to reduce memory allocations.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") @@ -284,9 +284,9 @@ type BucketIndexConfig struct { } func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "True to enable querier to discover blocks in the storage via bucket index instead of bucket scanning.") - f.DurationVar(&cfg.UpdateOnStaleInterval, prefix+"update-on-stale-interval", 15*time.Minute, "How frequently a cached bucket index should be refreshed.") - f.DurationVar(&cfg.UpdateOnErrorInterval, prefix+"update-on-error-interval", time.Minute, "How frequently a bucket index, which previously failed to load, should be tried to load again.") - f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache.") + f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "True to enable querier and store-gateway to discover blocks in the storage via bucket index instead of bucket scanning.") + f.DurationVar(&cfg.UpdateOnStaleInterval, prefix+"update-on-stale-interval", 15*time.Minute, "How frequently a cached bucket index should be refreshed. This option is used only by querier.") + f.DurationVar(&cfg.UpdateOnErrorInterval, prefix+"update-on-error-interval", time.Minute, "How frequently a bucket index, which previously failed to load, should be tried to load again. This option is used only by querier.") + f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.") f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, while this check is enforced in the querier (at query time).") } diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go new file mode 100644 index 00000000000..94d12aed647 --- /dev/null +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -0,0 +1,26 @@ +package testutil + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" +) + +func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { + storageDir, err := ioutil.TempDir(os.TempDir(), "bucket") + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(storageDir)) + }) + + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + return objstore.BucketWithMetrics("test", bkt, nil), storageDir +} diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go new file mode 100644 index 00000000000..8491b601daf --- /dev/null +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -0,0 +1,236 @@ +package storegateway + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" +) + +// BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Cortex bucket index. +type BucketIndexMetadataFetcher struct { + userID string + bkt objstore.Bucket + strategy ShardingStrategy + logger log.Logger + filters []block.MetadataFilter + modifiers []block.MetadataModifier + metrics *fetcherMetrics +} + +func NewBucketIndexMetadataFetcher( + userID string, + bkt objstore.Bucket, + strategy ShardingStrategy, + logger log.Logger, + reg prometheus.Registerer, + filters []block.MetadataFilter, + modifiers []block.MetadataModifier, +) *BucketIndexMetadataFetcher { + return &BucketIndexMetadataFetcher{ + userID: userID, + bkt: bkt, + strategy: strategy, + logger: logger, + filters: filters, + modifiers: modifiers, + metrics: newFetcherMetrics(reg), + } +} + +// Fetch implements metadata.MetadataFetcher. +func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { + f.metrics.resetTx() + + // Check whether the user belongs to the shard. + if len(f.strategy.FilterUsers(ctx, []string{f.userID})) != 1 { + f.metrics.submit() + return nil, nil, nil + } + + // Track duration and sync counters only if wasn't filtered out by the sharding strategy. + start := time.Now() + defer func() { + f.metrics.syncDuration.Observe(time.Since(start).Seconds()) + if err != nil { + f.metrics.syncFailures.Inc() + } + }() + f.metrics.syncs.Inc() + + // Fetch the bucket index. + idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.logger) + if errors.Is(err, bucketindex.ErrIndexNotFound) { + // This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters + // and their bucket index has not been created yet. + f.metrics.synced.WithLabelValues(noBucketIndex).Set(1) + f.metrics.submit() + + return nil, nil, nil + } + if errors.Is(err, bucketindex.ErrIndexCorrupted) { + // In case a single tenant bucket index is corrupted, we don't want the store-gateway to fail at startup + // because unable to fetch blocks metadata. We'll act as if the tenant has no bucket index, but the query + // will fail anyway in the querier (the querier fails in the querier if bucket index is corrupted). + level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err) + f.metrics.synced.WithLabelValues(corruptedBucketIndex).Set(1) + f.metrics.submit() + + return nil, nil, nil + } + if err != nil { + f.metrics.synced.WithLabelValues(failedMeta).Set(1) + f.metrics.submit() + + return nil, nil, errors.Wrapf(err, "read bucket index") + } + + // Build block metas out of the index. + metas = make(map[ulid.ULID]*metadata.Meta, len(idx.Blocks)) + for _, b := range idx.Blocks { + metas[b.ID] = b.ThanosMeta(f.userID) + } + + for _, filter := range f.filters { + var err error + + // NOTE: filter can update synced metric accordingly to the reason of the exclude. + if customFilter, ok := filter.(MetadataFilterWithBucketIndex); ok { + err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.synced) + } else { + err = filter.Filter(ctx, metas, f.metrics.synced) + } + + if err != nil { + return nil, nil, errors.Wrap(err, "filter metas") + } + } + + for _, m := range f.modifiers { + // NOTE: modifier can update modified metric accordingly to the reason of the modification. + if err := m.Modify(ctx, metas, f.metrics.modified); err != nil { + return nil, nil, errors.Wrap(err, "modify metas") + } + } + + f.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) + f.metrics.submit() + + return metas, nil, nil +} + +func (f *BucketIndexMetadataFetcher) UpdateOnChange(callback func([]metadata.Meta, error)) { + // Unused by the store-gateway. + callback(nil, errors.New("UpdateOnChange is unsupported")) +} + +const ( + fetcherSubSys = "blocks_meta" + + corruptedMeta = "corrupted-meta-json" + noMeta = "no-meta-json" + loadedMeta = "loaded" + failedMeta = "failed" + corruptedBucketIndex = "corrupted-bucket-index" + noBucketIndex = "no-bucket-index" + + // Synced label values. + labelExcludedMeta = "label-excluded" + timeExcludedMeta = "time-excluded" + tooFreshMeta = "too-fresh" + duplicateMeta = "duplicate" + // Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted, + // but don't have a replacement block yet. + markedForDeletionMeta = "marked-for-deletion" + + // MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric. + MarkedForNoCompactionMeta = "marked-for-no-compact" + + // Modified label values. + replicaRemovedMeta = "replica-label-removed" +) + +// fetcherMetrics is a copy of Thanos internal fetcherMetrics. These metrics have been copied from +// Thanos in order to track the same exact metrics in our own custom metadata fetcher implementation. +type fetcherMetrics struct { + syncs prometheus.Counter + syncFailures prometheus.Counter + syncDuration prometheus.Histogram + + synced *extprom.TxGaugeVec + modified *extprom.TxGaugeVec +} + +func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { + var m fetcherMetrics + + m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Subsystem: fetcherSubSys, + Name: "syncs_total", + Help: "Total blocks metadata synchronization attempts", + }) + m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Subsystem: fetcherSubSys, + Name: "sync_failures_total", + Help: "Total blocks metadata synchronization failures", + }) + m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Subsystem: fetcherSubSys, + Name: "sync_duration_seconds", + Help: "Duration of the blocks metadata synchronization in seconds", + Buckets: []float64{0.01, 1, 10, 100, 1000}, + }) + m.synced = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: fetcherSubSys, + Name: "synced", + Help: "Number of block metadata synced", + }, + []string{"state"}, + []string{corruptedMeta}, + []string{corruptedBucketIndex}, + []string{noMeta}, + []string{noBucketIndex}, + []string{loadedMeta}, + []string{tooFreshMeta}, + []string{failedMeta}, + []string{labelExcludedMeta}, + []string{timeExcludedMeta}, + []string{duplicateMeta}, + []string{markedForDeletionMeta}, + []string{MarkedForNoCompactionMeta}, + ) + m.modified = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: fetcherSubSys, + Name: "modified", + Help: "Number of blocks whose metadata changed", + }, + []string{"modified"}, + []string{replicaRemovedMeta}, + ) + return &m +} + +func (s *fetcherMetrics) submit() { + s.synced.Submit() + s.modified.Submit() +} + +func (s *fetcherMetrics) resetTx() { + s.synced.ResetTx() + s.modified.ResetTx() +} diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go new file mode 100644 index 00000000000..f996b745185 --- /dev/null +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -0,0 +1,323 @@ +package storegateway + +import ( + "bytes" + "context" + "path" + "strings" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/util/concurrency" +) + +func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { + const userID = "user-1" + + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + reg := prometheus.NewPedanticRegistry() + ctx := context.Background() + now := time.Now() + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + + // Create a bucket index. + block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil)} + block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil)} + block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil)} + mark1 := &bucketindex.BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold. + mark2 := &bucketindex.BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold. + + require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, &bucketindex.Index{ + Version: bucketindex.IndexVersion1, + Blocks: bucketindex.Blocks{block1, block2, block3}, + BlockDeletionMarks: bucketindex.BlockDeletionMarks{mark1, mark2}, + UpdatedAt: now.Unix(), + })) + + // Create a metadata fetcher with filters. + filters := []block.MetadataFilter{ + NewIgnoreDeletionMarkFilter(logger, bucket.NewUserBucketClient(userID, bkt), 2*time.Hour, 1), + } + + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), logger, reg, filters, nil) + metas, partials, err := fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Equal(t, map[ulid.ULID]*metadata.Meta{ + block1.ID: block1.ThanosMeta(userID), + block3.ID: block3.ThanosMeta(userID), + }, metas) + assert.Empty(t, partials) + assert.Empty(t, logs) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_modified Number of blocks whose metadata changed + # TYPE blocks_meta_modified gauge + blocks_meta_modified{modified="replica-label-removed"} 0 + + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 0 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 2 + blocks_meta_synced{state="marked-for-deletion"} 1 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 1 + `), + "blocks_meta_modified", + "blocks_meta_sync_failures_total", + "blocks_meta_synced", + "blocks_meta_syncs_total", + )) +} + +func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { + const userID = "user-1" + + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + reg := prometheus.NewPedanticRegistry() + ctx := context.Background() + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), logger, reg, nil, nil) + metas, partials, err := fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Empty(t, metas) + assert.Empty(t, partials) + assert.Empty(t, logs) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_modified Number of blocks whose metadata changed + # TYPE blocks_meta_modified gauge + blocks_meta_modified{modified="replica-label-removed"} 0 + + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 0 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 1 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 1 + `), + "blocks_meta_modified", + "blocks_meta_sync_failures_total", + "blocks_meta_synced", + "blocks_meta_syncs_total", + )) +} + +func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) { + const userID = "user-1" + + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + reg := prometheus.NewPedanticRegistry() + ctx := context.Background() + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + + // Upload a corrupted bucket index. + require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!"))) + + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), logger, reg, nil, nil) + metas, partials, err := fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Empty(t, metas) + assert.Empty(t, partials) + assert.Regexp(t, "corrupted bucket index found", logs) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_modified Number of blocks whose metadata changed + # TYPE blocks_meta_modified gauge + blocks_meta_modified{modified="replica-label-removed"} 0 + + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 1 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 1 + `), + "blocks_meta_modified", + "blocks_meta_sync_failures_total", + "blocks_meta_synced", + "blocks_meta_syncs_total", + )) +} + +func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T) { + const userID = "user-1" + + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + reg := prometheus.NewPedanticRegistry() + ctx := context.Background() + now := time.Now() + logger := log.NewNopLogger() + strategy := &mockShardingStrategy{} + strategy.On("FilterUsers", mock.Anything, mock.Anything).Return([]string{userID}) + + // Corrupted bucket index. + require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!"))) + + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, strategy, logger, reg, nil, nil) + metas, _, err := fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Len(t, metas, 0) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 1 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + `), "blocks_meta_synced")) + + // No bucket index. + require.NoError(t, bucketindex.DeleteIndex(ctx, bkt, userID)) + + metas, _, err = fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Len(t, metas, 0) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 0 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 1 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + `), "blocks_meta_synced")) + + // Create a bucket index. + block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil)} + block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil)} + block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil)} + + require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, &bucketindex.Index{ + Version: bucketindex.IndexVersion1, + Blocks: bucketindex.Blocks{block1, block2, block3}, + UpdatedAt: now.Unix(), + })) + + metas, _, err = fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Len(t, metas, 3) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 0 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 3 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + `), "blocks_meta_synced")) + + // Remove the tenant from the shard. + strategy = &mockShardingStrategy{} + strategy.On("FilterUsers", mock.Anything, mock.Anything).Return([]string{}) + fetcher.strategy = strategy + + metas, _, err = fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Len(t, metas, 0) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 0 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + `), "blocks_meta_synced")) +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index cecd68e1971..ed67e1f8861 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -329,45 +329,64 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro level.Info(userLogger).Log("msg", "creating user bucket store") userBkt := bucket.NewUserBucketClient(userID, u.bucket) + fetcherReg := prometheus.NewRegistry() - // Wrap the bucket reader to skip iterating the bucket at all if the user doesn't - // belong to the store-gateway shard. We need to run the BucketStore synching anyway - // in order to unload previous tenants in case of a resharding leading to tenants - // moving out from the store-gateway shard and also make sure both MetaFetcher and - // BucketStore metrics are correctly updated. - fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt) + // The sharding strategy filter MUST be before the ones we create here (order matters). + filters := append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{ + block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg), + // Use our own custom implementation. + NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay, u.cfg.BucketStore.MetaSyncConcurrency), + // The duplicate filter has been intentionally omitted because it could cause troubles with + // the consistency check done on the querier. The duplicate filter removes redundant blocks + // but if the store-gateway removes redundant blocks before the querier discovers them, the + // consistency check on the querier will fail. + }...) + + modifiers := []block.MetadataModifier{ + // Remove Cortex external labels so that they're not injected when querying blocks. + NewReplicaLabelRemover(userLogger, []string{ + tsdb.TenantIDExternalLabel, + tsdb.IngesterIDExternalLabel, + tsdb.ShardIDExternalLabel, + }), + } - fetcherReg := prometheus.NewRegistry() - fetcher, err := block.NewMetaFetcher( - userLogger, - u.cfg.BucketStore.MetaSyncConcurrency, - fetcherBkt, - filepath.Join(u.cfg.BucketStore.SyncDir, userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory - fetcherReg, - // The sharding strategy filter MUST be before the ones we create here (order matters). - append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{ - block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg), - block.NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay, u.cfg.BucketStore.MetaSyncConcurrency), - // The duplicate filter has been intentionally omitted because it could cause troubles with - // the consistency check done on the querier. The duplicate filter removes redundant blocks - // but if the store-gateway removes redundant blocks before the querier discovers them, the - // consistency check on the querier will fail. - }...), - []block.MetadataModifier{ - // Remove Cortex external labels so that they're not injected when querying blocks. - NewReplicaLabelRemover(userLogger, []string{ - tsdb.TenantIDExternalLabel, - tsdb.IngesterIDExternalLabel, - tsdb.ShardIDExternalLabel, - }), - }, - ) - if err != nil { - return nil, err + // Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not. + var fetcher block.MetadataFetcher + if u.cfg.BucketStore.BucketIndex.Enabled { + fetcher = NewBucketIndexMetadataFetcher( + userID, + u.bucket, + u.shardingStrategy, + u.logger, + fetcherReg, + filters, + modifiers) + } else { + // Wrap the bucket reader to skip iterating the bucket at all if the user doesn't + // belong to the store-gateway shard. We need to run the BucketStore synching anyway + // in order to unload previous tenants in case of a resharding leading to tenants + // moving out from the store-gateway shard and also make sure both MetaFetcher and + // BucketStore metrics are correctly updated. + fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt) + + var err error + fetcher, err = block.NewMetaFetcher( + userLogger, + u.cfg.BucketStore.MetaSyncConcurrency, + fetcherBkt, + filepath.Join(u.cfg.BucketStore.SyncDir, userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory + fetcherReg, + filters, + modifiers, + ) + if err != nil { + return nil, err + } } bucketStoreReg := prometheus.NewRegistry() - bs, err = store.NewBucketStore( + bs, err := store.NewBucketStore( userLogger, bucketStoreReg, userBkt, diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 40af568b00c..41ec6067e21 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -23,8 +23,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -33,6 +35,8 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -212,9 +216,7 @@ func TestStoreGateway_InitialSyncFailure(t *testing.T) { } func TestStoreGateway_BlocksSharding(t *testing.T) { - storageDir, err := ioutil.TempDir(os.TempDir(), "") - require.NoError(t, err) - defer os.RemoveAll(storageDir) //nolint:errcheck + bucketClient, storageDir := cortex_testutil.PrepareFilesystemBucket(t) // This tests uses real TSDB blocks. 24h time range, 2h block range period, // 2 users = total (24 / 12) * 2 = 24 blocks. @@ -224,8 +226,10 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { require.NoError(t, mockTSDB(path.Join(storageDir, "user-1"), 24, now.Add(-24*time.Hour).Unix()*1000, now.Unix()*1000)) require.NoError(t, mockTSDB(path.Join(storageDir, "user-2"), 24, now.Add(-24*time.Hour).Unix()*1000, now.Unix()*1000)) - bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) - require.NoError(t, err) + // Write the bucket index. + for _, userID := range []string{"user-1", "user-2"} { + createBucketIndex(t, bucketClient, userID) + } tests := map[string]struct { shardingStrategy string // Empty string means disabled. @@ -292,83 +296,86 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { } for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - ctx := context.Background() - storageCfg, cleanup := mockStorageConfig(t) - storageCfg.BucketStore.SyncInterval = time.Hour // Do not trigger the periodic sync in this test (we explicitly sync stores). - defer cleanup() - ringStore := consul.NewInMemoryClient(ring.GetCodec()) + for _, bucketIndexEnabled := range []bool{true, false} { + t.Run(fmt.Sprintf("%s (bucket index enabled = %v)", testName, bucketIndexEnabled), func(t *testing.T) { + ctx := context.Background() + storageCfg, cleanup := mockStorageConfig(t) + storageCfg.BucketStore.SyncInterval = time.Hour // Do not trigger the periodic sync in this test (we explicitly sync stores). + storageCfg.BucketStore.BucketIndex.Enabled = bucketIndexEnabled + defer cleanup() + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + + // Start the configure number of gateways. + var gateways []*StoreGateway + var gatewayIds []string + registries := util.NewUserRegistries() + + for i := 1; i <= testData.numGateways; i++ { + instanceID := fmt.Sprintf("gateway-%d", i) + + limits := defaultLimitsConfig() + gatewayCfg := mockGatewayConfig() + gatewayCfg.ShardingRing.ReplicationFactor = testData.replicationFactor + gatewayCfg.ShardingRing.InstanceID = instanceID + gatewayCfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + gatewayCfg.ShardingRing.RingCheckPeriod = time.Hour // Do not check the ring topology changes in this test (we explicitly sync stores). + + if testData.shardingStrategy == "" { + gatewayCfg.ShardingEnabled = false + } else { + gatewayCfg.ShardingEnabled = true + gatewayCfg.ShardingStrategy = testData.shardingStrategy + limits.StoreGatewayTenantShardSize = testData.tenantShardSize + } - // Start the configure number of gateways. - var gateways []*StoreGateway - var gatewayIds []string - registries := util.NewUserRegistries() + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) - for i := 1; i <= testData.numGateways; i++ { - instanceID := fmt.Sprintf("gateway-%d", i) + reg := prometheus.NewPedanticRegistry() + g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, overrides, mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck - limits := defaultLimitsConfig() - gatewayCfg := mockGatewayConfig() - gatewayCfg.ShardingRing.ReplicationFactor = testData.replicationFactor - gatewayCfg.ShardingRing.InstanceID = instanceID - gatewayCfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) - gatewayCfg.ShardingRing.RingCheckPeriod = time.Hour // Do not check the ring topology changes in this test (we explicitly sync stores). + require.NoError(t, services.StartAndAwaitRunning(ctx, g)) - if testData.shardingStrategy == "" { - gatewayCfg.ShardingEnabled = false - } else { - gatewayCfg.ShardingEnabled = true - gatewayCfg.ShardingStrategy = testData.shardingStrategy - limits.StoreGatewayTenantShardSize = testData.tenantShardSize + gateways = append(gateways, g) + gatewayIds = append(gatewayIds, instanceID) + registries.AddUserRegistry(instanceID, reg) } - overrides, err := validation.NewOverrides(limits, nil) - require.NoError(t, err) - - reg := prometheus.NewPedanticRegistry() - g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, overrides, mockLoggingLevel(), log.NewNopLogger(), reg) - require.NoError(t, err) - defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck - - require.NoError(t, services.StartAndAwaitRunning(ctx, g)) + // Wait until the ring client of each gateway has synced (to avoid flaky tests on subsequent assertions). + if testData.shardingStrategy != "" { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() - gateways = append(gateways, g) - gatewayIds = append(gatewayIds, instanceID) - registries.AddUserRegistry(instanceID, reg) - } - - // Wait until the ring client of each gateway has synced (to avoid flaky tests on subsequent assertions). - if testData.shardingStrategy != "" { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - // A gateway is ready for the test once it sees all instances ACTIVE in the ring. - for _, g := range gateways { - for _, instanceID := range gatewayIds { - require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE)) + // A gateway is ready for the test once it sees all instances ACTIVE in the ring. + for _, g := range gateways { + for _, instanceID := range gatewayIds { + require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE)) + } } } - } - // Re-sync the stores because the ring topology has changed in the meanwhile - // (when the 1st gateway has synched the 2nd gateway didn't run yet). - for _, g := range gateways { - g.syncStores(ctx, syncReasonRingChange) - } + // Re-sync the stores because the ring topology has changed in the meanwhile + // (when the 1st gateway has synched the 2nd gateway didn't run yet). + for _, g := range gateways { + g.syncStores(ctx, syncReasonRingChange) + } - // Assert on the number of blocks loaded extracting this information from metrics. - metrics := registries.BuildMetricFamiliesPerUser() - assert.Equal(t, float64(testData.expectedBlocksLoaded), metrics.GetSumOfGauges("cortex_bucket_store_blocks_loaded")) - assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered")) + // Assert on the number of blocks loaded extracting this information from metrics. + metrics := registries.BuildMetricFamiliesPerUser() + assert.Equal(t, float64(testData.expectedBlocksLoaded), metrics.GetSumOfGauges("cortex_bucket_store_blocks_loaded")) + assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered")) - if testData.shardingStrategy == util.ShardingStrategyShuffle { - assert.Equal(t, float64(testData.tenantShardSize*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced")) - assert.Equal(t, float64(testData.tenantShardSize*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced")) - } else { - assert.Equal(t, float64(testData.numGateways*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced")) - assert.Equal(t, float64(testData.numGateways*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced")) - } - }) + if testData.shardingStrategy == util.ShardingStrategyShuffle { + assert.Equal(t, float64(testData.tenantShardSize*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced")) + assert.Equal(t, float64(testData.tenantShardSize*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced")) + } else { + assert.Equal(t, float64(testData.numGateways*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced")) + assert.Equal(t, float64(testData.numGateways*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced")) + } + }) + } } } @@ -646,10 +653,14 @@ func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) { bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) + createBucketIndex(t, bucketClient, userID) + // Find the created blocks (we expect 2). var blockIDs []string require.NoError(t, bucketClient.Iter(ctx, "user-1/", func(key string) error { - blockIDs = append(blockIDs, strings.TrimSuffix(strings.TrimPrefix(key, userID+"/"), "/")) + if _, ok := block.IsBlockDir(key); ok { + blockIDs = append(blockIDs, strings.TrimSuffix(strings.TrimPrefix(key, userID+"/"), "/")) + } return nil })) require.Len(t, blockIDs, 2) @@ -669,45 +680,50 @@ func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) { require.NoError(t, err) } - // Create a store-gateway used to query back the series from the blocks. - gatewayCfg := mockGatewayConfig() - gatewayCfg.ShardingEnabled = false - storageCfg, cleanup := mockStorageConfig(t) - defer cleanup() + for _, bucketIndexEnabled := range []bool{true, false} { + t.Run(fmt.Sprintf("bucket index enabled = %v", bucketIndexEnabled), func(t *testing.T) { + // Create a store-gateway used to query back the series from the blocks. + gatewayCfg := mockGatewayConfig() + gatewayCfg.ShardingEnabled = false + storageCfg, cleanup := mockStorageConfig(t) + storageCfg.BucketStore.BucketIndex.Enabled = bucketIndexEnabled + defer cleanup() - g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, defaultLimitsOverrides(t), mockLoggingLevel(), logger, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(ctx, g)) - defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck + g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, defaultLimitsOverrides(t), mockLoggingLevel(), logger, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, g)) + defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck - // Query back all series. - req := &storepb.SeriesRequest{ - MinTime: minT, - MaxTime: maxT, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_RE, Name: "__name__", Value: ".*"}, - }, - } + // Query back all series. + req := &storepb.SeriesRequest{ + MinTime: minT, + MaxTime: maxT, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "__name__", Value: ".*"}, + }, + } - srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) - err = g.Series(req, srv) - require.NoError(t, err) - assert.Empty(t, srv.Warnings) - assert.Len(t, srv.SeriesSet, numSeries) + srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + err = g.Series(req, srv) + require.NoError(t, err) + assert.Empty(t, srv.Warnings) + assert.Len(t, srv.SeriesSet, numSeries) - for seriesID := 0; seriesID < numSeries; seriesID++ { - actual := srv.SeriesSet[seriesID] + for seriesID := 0; seriesID < numSeries; seriesID++ { + actual := srv.SeriesSet[seriesID] - // Ensure Cortex external labels have been removed. - assert.Equal(t, []labelpb.ZLabel{{Name: "series_id", Value: strconv.Itoa(seriesID)}}, actual.Labels) + // Ensure Cortex external labels have been removed. + assert.Equal(t, []labelpb.ZLabel{{Name: "series_id", Value: strconv.Itoa(seriesID)}}, actual.Labels) - // Ensure samples have been correctly queried. The Thanos store also deduplicate samples - // in most cases, but it's not strictly required guaranteeing deduplication at this stage. - samples, err := readSamplesFromChunks(actual.Chunks) - require.NoError(t, err) - assert.Equal(t, []sample{ - {ts: minT + (step * int64(seriesID)), value: float64(seriesID)}, - }, samples) + // Ensure samples have been correctly queried. The Thanos store also deduplicate samples + // in most cases, but it's not strictly required guaranteeing deduplication at this stage. + samples, err := readSamplesFromChunks(actual.Chunks) + require.NoError(t, err) + assert.Equal(t, []sample{ + {ts: minT + (step * int64(seriesID)), value: float64(seriesID)}, + }, samples) + } + }) } } @@ -939,3 +955,12 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, args := m.Called(ctx, userID, metas, synced) return args.Error(0) } + +func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index { + updater := bucketindex.NewUpdater(bkt, userID, log.NewNopLogger()) + idx, _, err := updater.UpdateIndex(context.Background(), nil) + require.NoError(t, err) + require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, idx)) + + return idx +} diff --git a/pkg/storegateway/metadata_fetcher_filters.go b/pkg/storegateway/metadata_fetcher_filters.go new file mode 100644 index 00000000000..7bd8693dd48 --- /dev/null +++ b/pkg/storegateway/metadata_fetcher_filters.go @@ -0,0 +1,78 @@ +package storegateway + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" +) + +type MetadataFilterWithBucketIndex interface { + // FilterWithBucketIndex is like Thanos MetadataFilter.Filter() but it provides in input the bucket index too. + FilterWithBucketIndex(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, synced *extprom.TxGaugeVec) error +} + +// IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements +// the MetadataFilterWithBucketIndex interface. +type IgnoreDeletionMarkFilter struct { + upstream *block.IgnoreDeletionMarkFilter + + delay time.Duration + deletionMarkMap map[ulid.ULID]*metadata.DeletionMark +} + +// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter. +func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter { + return &IgnoreDeletionMarkFilter{ + upstream: block.NewIgnoreDeletionMarkFilter(logger, bkt, delay, concurrency), + delay: delay, + } +} + +// DeletionMarkBlocks returns blocks that were marked for deletion. +func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark { + // If the cached deletion marks exist it means the filter function was called with the bucket + // index, so it's safe to return it. + if f.deletionMarkMap != nil { + return f.deletionMarkMap + } + + return f.upstream.DeletionMarkBlocks() +} + +// Filter implements block.MetadataFilter. +func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { + return f.upstream.Filter(ctx, metas, synced) +} + +// FilterWithBucketIndex implements MetadataFilterWithBucketIndex. +func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, synced *extprom.TxGaugeVec) error { + // Build a map of block deletion marks + marks := make(map[ulid.ULID]*metadata.DeletionMark, len(idx.BlockDeletionMarks)) + for _, mark := range idx.BlockDeletionMarks { + marks[mark.ID] = mark.ThanosDeletionMark() + } + + // Keep it cached. + f.deletionMarkMap = marks + + for _, mark := range marks { + if _, ok := metas[mark.ID]; !ok { + continue + } + + if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.delay.Seconds() { + synced.WithLabelValues(markedForDeletionMeta).Inc() + delete(metas, mark.ID) + } + } + + return nil +} diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go new file mode 100644 index 00000000000..57736bb149b --- /dev/null +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -0,0 +1,106 @@ +package storegateway + +import ( + "bytes" + "context" + "encoding/json" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" +) + +func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) { + testIgnoreDeletionMarkFilter(t, false) +} + +func TestIgnoreDeletionMarkFilter_FilterWithBucketIndex(t *testing.T) { + testIgnoreDeletionMarkFilter(t, true) +} + +func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { + const userID = "user-1" + + now := time.Now() + ctx := context.Background() + logger := log.NewNopLogger() + + // Create a bucket backed by filesystem. + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + bkt = bucketindex.BucketWithGlobalMarkers(bkt) + userBkt := bucket.NewUserBucketClient(userID, bkt) + + shouldFetch := &metadata.DeletionMark{ + ID: ulid.MustNew(1, nil), + DeletionTime: now.Add(-15 * time.Hour).Unix(), + Version: 1, + } + + shouldIgnore := &metadata.DeletionMark{ + ID: ulid.MustNew(2, nil), + DeletionTime: now.Add(-60 * time.Hour).Unix(), + Version: 1, + } + + var buf bytes.Buffer + require.NoError(t, json.NewEncoder(&buf).Encode(&shouldFetch)) + require.NoError(t, userBkt.Upload(ctx, path.Join(shouldFetch.ID.String(), metadata.DeletionMarkFilename), &buf)) + require.NoError(t, json.NewEncoder(&buf).Encode(&shouldIgnore)) + require.NoError(t, userBkt.Upload(ctx, path.Join(shouldIgnore.ID.String(), metadata.DeletionMarkFilename), &buf)) + require.NoError(t, userBkt.Upload(ctx, path.Join(ulid.MustNew(3, nil).String(), metadata.DeletionMarkFilename), bytes.NewBufferString("not a valid deletion-mark.json"))) + + // Create the bucket index if required. + var idx *bucketindex.Index + if bucketIndexEnabled { + var err error + + u := bucketindex.NewUpdater(bkt, userID, logger) + idx, _, err = u.UpdateIndex(ctx, nil) + require.NoError(t, err) + require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, idx)) + } + + inputMetas := map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(1, nil): {}, + ulid.MustNew(2, nil): {}, + ulid.MustNew(3, nil): {}, + ulid.MustNew(4, nil): {}, + } + + expectedMetas := map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(1, nil): {}, + ulid.MustNew(3, nil): {}, + ulid.MustNew(4, nil): {}, + } + + expectedDeletionMarks := map[ulid.ULID]*metadata.DeletionMark{ + ulid.MustNew(1, nil): shouldFetch, + ulid.MustNew(2, nil): shouldIgnore, + } + + synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "synced"}, []string{"state"}) + f := NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(userBkt), 48*time.Hour, 32) + + if bucketIndexEnabled { + require.NoError(t, f.FilterWithBucketIndex(ctx, inputMetas, idx, synced)) + } else { + require.NoError(t, f.Filter(ctx, inputMetas, synced)) + } + + assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(markedForDeletionMeta))) + assert.Equal(t, expectedMetas, inputMetas) + assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks()) +} diff --git a/pkg/storegateway/metadata_fetcher_metrics.go b/pkg/storegateway/metadata_fetcher_metrics.go index 113dd616235..65391db0c38 100644 --- a/pkg/storegateway/metadata_fetcher_metrics.go +++ b/pkg/storegateway/metadata_fetcher_metrics.go @@ -27,6 +27,8 @@ func NewMetadataFetcherMetrics() *MetadataFetcherMetrics { return &MetadataFetcherMetrics{ regs: util.NewUserRegistries(), + // When mapping new metadata fetcher metrics from Thanos, please remember to add these metrics + // to our internal fetcherMetrics implementation too. syncs: prometheus.NewDesc( "cortex_blocks_meta_syncs_total", "Total blocks metadata synchronization attempts", diff --git a/website/static/images/blocks-storage/bucket-index-querier-logic.png b/website/static/images/blocks-storage/bucket-index-querier-logic.png deleted file mode 100644 index baaa2865714..00000000000 Binary files a/website/static/images/blocks-storage/bucket-index-querier-logic.png and /dev/null differ diff --git a/website/static/images/blocks-storage/bucket-index-querier-workflow.png b/website/static/images/blocks-storage/bucket-index-querier-workflow.png new file mode 100644 index 00000000000..6a1d81bec97 Binary files /dev/null and b/website/static/images/blocks-storage/bucket-index-querier-workflow.png differ