diff --git a/CHANGELOG.md b/CHANGELOG.md
index d2a08cfbbd9..b288a43a4c0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,11 @@
* [CHANGE] Blocks storage: compactor is now required when running a Cortex cluster with the blocks storage, because it also keeps the bucket index updated. #3583
* [CHANGE] Blocks storage: block deletion marks are now stored in a per-tenant global markers/ location too, other than within the block location. The compactor, at startup, will copy deletion marks from the block location to the global location. This migration is required only once, so you can safely disable it via `-compactor.block-deletion-marks-migration-enabled=false` once new compactor has successfully started once in your cluster. #3583
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers and store-gateways. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583
+* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier. When enabled, the querier will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics have been added: #3614
+ * `cortex_bucket_index_loads_total`
+ * `cortex_bucket_index_load_failures_total`
+ * `cortex_bucket_index_load_duration_seconds`
+ * `cortex_bucket_index_loaded`
* [ENHANCEMENT] Compactor: exported the following metrics. #3583
* `cortex_bucket_blocks_count`: Total number of blocks per tenant in the bucket. Includes blocks marked for deletion.
* `cortex_bucket_blocks_marked_for_deletion_count`: Total number of blocks per tenant marked for deletion in the bucket.
diff --git a/docs/blocks-storage/_index.md b/docs/blocks-storage/_index.md
index 6d5c87275c7..e0541f47bf3 100644
--- a/docs/blocks-storage/_index.md
+++ b/docs/blocks-storage/_index.md
@@ -29,7 +29,7 @@ When running the Cortex blocks storage, the Cortex architecture doesn't signific
The **[store-gateway](./store-gateway.md)** is responsible to query blocks and is used by the [querier](./querier.md) at query time. The store-gateway is required when running the blocks storage.
-The **[compactor](./compactor.md)** is responsible to merge and deduplicate smaller blocks into larger ones, in order to reduce the number of blocks stored in the long-term storage for a given tenant and query them more efficiently. It also keeps the bucket index updated and, for this reason, it's a required component.
+The **[compactor](./compactor.md)** is responsible to merge and deduplicate smaller blocks into larger ones, in order to reduce the number of blocks stored in the long-term storage for a given tenant and query them more efficiently. It also keeps the [bucket index](./bucket-index.md) updated and, for this reason, it's a required component.
Finally, the [**table-manager**](../chunks-storage/table-manager.md) and the [**schema config**](../chunks-storage/schema-config.md) are **not used** by the blocks storage.
diff --git a/docs/blocks-storage/bucket-index.md b/docs/blocks-storage/bucket-index.md
new file mode 100644
index 00000000000..ce006e939ed
--- /dev/null
+++ b/docs/blocks-storage/bucket-index.md
@@ -0,0 +1,57 @@
+---
+title: "Bucket Index"
+linkTitle: "Bucket Index"
+weight: 5
+slug: bucket-index
+---
+
+The bucket index is a **per-tenant file containing the list of blocks and block deletion marks** in the storage. The bucket index itself is stored in the backend object storage, is periodically updated by the compactor and used by queriers to discover blocks in the storage.
+
+The bucket index usage is **optional** and can be enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true` (or its respective YAML config option).
+
+## Benefits
+
+The [querier](./querier.md) needs to have an almost up-to-date view over the entire storage bucket, in order to find the right blocks to lookup at query time. Because of this, querier needs to periodically scan the bucket to look for new blocks uploaded by ingester or compactor, and blocks deleted (or marked for deletion) by compactor.
+
+When this bucket index is enabled, the querier periodically look up the per-tenant bucket index instead of scanning the bucket via "list objects" operations. This brings few benefits:
+
+1. Reduced number of API calls to the object storage by querier
+2. No "list objects" storage API calls done by querier
+3. The [querier](./querier.md) is up and running immediately after the startup (no need to run an initial bucket scan)
+
+## Structure of the index
+
+The `bucket-index.json.gz` contains:
+
+- **`blocks`**
+ List of complete blocks of a tenant, including blocks marked for deletion (partial blocks are excluded from the index).
+- **`block_deletion_marks`**
+ List of block deletion marks.
+- **`updated_at`**
+ Unix timestamp (seconds precision) of when the index has been updated (written in the storage) the last time.
+
+## How it gets updated
+
+The [compactor](./compactor.md) periodically scans the bucket and uploads an updated bucket index to the storage. The frequency at which the bucket index is updated can be configured via `-compactor.cleanup-interval`.
+
+Despite using the bucket index is optional, the index itself is built and updated by the compactor even if `-blocks-storage.bucket-store.bucket-index.enabled` has **not** been enabled. This is intentional, so that once a Cortex cluster operator decides to enable the bucket index in a live cluster, the bucket index for any tenant is already existing and query results consistency is guaranteed. The overhead introduced by keeping the bucket index updated is expected to be non significative.
+
+## How it's used by the querier
+
+The [querier](./querier.md), at query time, checks whether the bucket index for the tenant has already been loaded in memory. If not, the querier downloads it from the storage and cache it in memory.
+
+_Given it's a small file, lazy downloading it doesn't significantly impact on first query performances, but allows to get a querier up and running without pre-downloading every tenant's bucket index. Moreover, if the [metadata cache](./querier.md#metadata-cache) is enabled, the bucket index will be cached for a short time in a shared cache, reducing the actual latency and number of API calls to the object storage in case multiple queriers will fetch the same tenant's bucket index in a short time._
+
+
+
+
+While in-memory, a background process will keep it **updated at periodic intervals**, so that subsequent queries from the same tenant to the same querier instance will use the cached (and periodically updated) bucket index. There are two config options involved:
+
+- `-blocks-storage.bucket-store.bucket-index.update-on-stale-interval`
+ This option configures how frequently a cached bucket index should be refreshed.
+- `-blocks-storage.bucket-store.bucket-index.update-on-error-interval`
+ If downloading a bucket index fails, the failure is cached for a short time in order to avoid hammering the backend storage. This option configures how frequently a bucket index, which previously failed to load, should be tried to load again.
+
+If a bucket index is unused for a long time (configurable via `-blocks-storage.bucket-store.bucket-index.idle-timeout`), e.g. because that querier instance is not receiving any query from the tenant, the querier will offload it, stopping to keep it updated at regular intervals. This is particularly for tenants which are resharded to different queriers when [shuffle sharding](../guides/shuffle-sharding.md) is enabled.
+
+Finally, the querier, at query time, checks how old is a bucket index (based on its `updated_at`) and fail a query if its age is older than `-blocks-storage.bucket-store.bucket-index.max-stale-period`. This circuit breaker is used to ensure queriers will not return any partial query results due to a stale view over the long-term storage.
diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md
index 2b151cb3e42..8393e69e29d 100644
--- a/docs/blocks-storage/compactor.md
+++ b/docs/blocks-storage/compactor.md
@@ -10,7 +10,7 @@ slug: compactor
The **compactor** is an service which is responsible to:
- Compact multiple blocks of a given tenant into a single optimized larger block. This helps to reduce storage costs (deduplication, index size reduction), and increase query speed (querying fewer blocks is faster).
-- Keep the per-tenant bucket index updated. The bucket index is used by [queriers](./querier.md) and [store-gateways](./store-gateway.md) to discover new blocks in the storage.
+- Keep the per-tenant bucket index updated. The [bucket index](./bucket-index.md) is used by [queriers](./querier.md) to discover new blocks in the storage.
The compactor is **stateless**.
diff --git a/docs/blocks-storage/compactor.template b/docs/blocks-storage/compactor.template
index 4ce41dad843..7fd64e60edc 100644
--- a/docs/blocks-storage/compactor.template
+++ b/docs/blocks-storage/compactor.template
@@ -10,7 +10,7 @@ slug: compactor
The **compactor** is an service which is responsible to:
- Compact multiple blocks of a given tenant into a single optimized larger block. This helps to reduce storage costs (deduplication, index size reduction), and increase query speed (querying fewer blocks is faster).
-- Keep the per-tenant bucket index updated. The bucket index is used by [queriers](./querier.md) and [store-gateways](./store-gateway.md) to discover new blocks in the storage.
+- Keep the per-tenant bucket index updated. The [bucket index](./bucket-index.md) is used by [queriers](./querier.md) to discover new blocks in the storage.
The compactor is **stateless**.
diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md
index b255797332b..23163d0f893 100644
--- a/docs/blocks-storage/querier.md
+++ b/docs/blocks-storage/querier.md
@@ -13,12 +13,28 @@ The querier is **stateless**.
## How it works
-At startup **queriers** iterate over the entire storage bucket to discover all tenants blocks and download the `meta.json` for each block. During this initial bucket scanning phase, a querier is not ready to handle incoming queries yet and its `/ready` readiness probe endpoint will fail.
+The querier needs to have an almost up-to-date view over the entire storage bucket, in order to find the right blocks to lookup at query time. The querier can keep the bucket view updated in to two different ways:
+
+1. Periodically scanning the bucket (default)
+2. Periodically downloading the [bucket index](./bucket-index.md)
+
+### Bucket index disabled (default)
+
+At startup, **queriers** iterate over the entire storage bucket to discover all tenants blocks and download the `meta.json` for each block. During this initial bucket scanning phase, a querier is not ready to handle incoming queries yet and its `/ready` readiness probe endpoint will fail.
While running, queriers periodically iterate over the storage bucket to discover new tenants and recently uploaded blocks. Queriers do **not** download any content from blocks except a small `meta.json` file containing the block's metadata (including the minimum and maximum timestamp of samples within the block).
Queriers use the metadata to compute the list of blocks that need to be queried at query time and fetch matching series from the [store-gateway](./store-gateway.md) instances holding the required blocks.
+### Bucket index enabled
+
+When [bucket index](./bucket-index.md) is enabled, queriers lazily download the bucket index upon the first query received for a given tenant, cache it in memory and periodically keep it update. The bucket index contains the list of blocks and block deletion marks of a tenant, which is later used during the query execution to find the set of blocks that need to be queried for the given query.
+
+Given the bucket index removes the need to scan the bucket, it brings few benefits:
+
+1. The querier is expected to be ready shortly after startup.
+2. Lower volume of API calls to object storage.
+
### Anatomy of a query request
When a querier receives a query range request, it contains the following parameters:
@@ -60,6 +76,7 @@ Caching is optional, but **highly recommended** in a production environment. Ple
- List of blocks per tenant
- Block's `meta.json` content
- Block's `deletion-mark.json` existence and content
+- Tenant's `bucket-index.json.gz` content
Using the metadata cache can significantly reduce the number of API calls to object storage and protects from linearly scale the number of these API calls with the number of querier and store-gateway instances (because the bucket is periodically scanned and synched by each querier and store-gateway).
@@ -341,8 +358,8 @@ blocks_storage:
# CLI flag: -blocks-storage.filesystem.dir
[dir: | default = ""]
- # This configures how the store-gateway synchronizes blocks stored in the
- # bucket.
+ # This configures how the querier and store-gateway discover and synchronize
+ # blocks stored in the bucket.
bucket_store:
# Directory to store synchronized TSDB index headers.
# CLI flag: -blocks-storage.bucket-store.sync-dir
@@ -579,7 +596,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-content-ttl
[metafile_content_ttl: | default = 24h]
- # Maximum size of metafile content to cache in bytes.
+ # Maximum size of metafile content to cache in bytes. Caching will be
+ # skipped if the content exceeds this size. This is useful to avoid
+ # network round trip for large content if the configured caching backend
+ # has an hard limit on cached items size (in this case, you should set
+ # this limit to the same limit in the caching backend).
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-max-size-bytes
[metafile_max_size_bytes: | default = 1048576]
@@ -587,6 +608,18 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-attributes-ttl
[metafile_attributes_ttl: | default = 168h]
+ # How long to cache content of the bucket index.
+ # CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl
+ [bucket_index_content_ttl: | default = 5m]
+
+ # Maximum size of bucket index content to cache in bytes. Caching will be
+ # skipped if the content exceeds this size. This is useful to avoid
+ # network round trip for large content if the configured caching backend
+ # has an hard limit on cached items size (in this case, you should set
+ # this limit to the same limit in the caching backend).
+ # CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
+ [bucket_index_max_size_bytes: | default = 1048576]
+
# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to
# ignore blocks that are marked for deletion with some delay. This ensures
@@ -596,6 +629,33 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: | default = 6h]
+ bucket_index:
+ # True to enable querier to discover blocks in the storage via bucket
+ # index instead of bucket scanning.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.enabled
+ [enabled: | default = false]
+
+ # How frequently a cached bucket index should be refreshed.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval
+ [update_on_stale_interval: | default = 15m]
+
+ # How frequently a bucket index, which previously failed to load, should
+ # be tried to load again.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
+ [update_on_error_interval: | default = 1m]
+
+ # How long a unused bucket index should be cached. Once this timeout
+ # expires, the unused bucket index is removed from the in-memory cache.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
+ [idle_timeout: | default = 1h]
+
+ # The maximum allowed age of a bucket index (last updated) before queries
+ # start failing because the bucket index is too old. The bucket index is
+ # periodically updated by the compactor, while this check is enforced in
+ # the querier (at query time).
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
+ [max_stale_period: | default = 1h]
+
tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
diff --git a/docs/blocks-storage/querier.template b/docs/blocks-storage/querier.template
index 97d59a25957..ee944d3d07a 100644
--- a/docs/blocks-storage/querier.template
+++ b/docs/blocks-storage/querier.template
@@ -13,12 +13,28 @@ The querier is **stateless**.
## How it works
-At startup **queriers** iterate over the entire storage bucket to discover all tenants blocks and download the `meta.json` for each block. During this initial bucket scanning phase, a querier is not ready to handle incoming queries yet and its `/ready` readiness probe endpoint will fail.
+The querier needs to have an almost up-to-date view over the entire storage bucket, in order to find the right blocks to lookup at query time. The querier can keep the bucket view updated in to two different ways:
+
+1. Periodically scanning the bucket (default)
+2. Periodically downloading the [bucket index](./bucket-index.md)
+
+### Bucket index disabled (default)
+
+At startup, **queriers** iterate over the entire storage bucket to discover all tenants blocks and download the `meta.json` for each block. During this initial bucket scanning phase, a querier is not ready to handle incoming queries yet and its `/ready` readiness probe endpoint will fail.
While running, queriers periodically iterate over the storage bucket to discover new tenants and recently uploaded blocks. Queriers do **not** download any content from blocks except a small `meta.json` file containing the block's metadata (including the minimum and maximum timestamp of samples within the block).
Queriers use the metadata to compute the list of blocks that need to be queried at query time and fetch matching series from the [store-gateway](./store-gateway.md) instances holding the required blocks.
+### Bucket index enabled
+
+When [bucket index](./bucket-index.md) is enabled, queriers lazily download the bucket index upon the first query received for a given tenant, cache it in memory and periodically keep it update. The bucket index contains the list of blocks and block deletion marks of a tenant, which is later used during the query execution to find the set of blocks that need to be queried for the given query.
+
+Given the bucket index removes the need to scan the bucket, it brings few benefits:
+
+1. The querier is expected to be ready shortly after startup.
+2. Lower volume of API calls to object storage.
+
### Anatomy of a query request
When a querier receives a query range request, it contains the following parameters:
@@ -60,6 +76,7 @@ Caching is optional, but **highly recommended** in a production environment. Ple
- List of blocks per tenant
- Block's `meta.json` content
- Block's `deletion-mark.json` existence and content
+- Tenant's `bucket-index.json.gz` content
Using the metadata cache can significantly reduce the number of API calls to object storage and protects from linearly scale the number of these API calls with the number of querier and store-gateway instances (because the bucket is periodically scanned and synched by each querier and store-gateway).
diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md
index c82aadbe6ae..99e7c5b2d13 100644
--- a/docs/blocks-storage/store-gateway.md
+++ b/docs/blocks-storage/store-gateway.md
@@ -125,6 +125,7 @@ Store-gateway and [querier](./querier.md) can use memcached for caching bucket m
- List of blocks per tenant
- Block's `meta.json` content
- Block's `deletion-mark.json` existence and content
+- Tenant's `bucket-index.json.gz` content
Using the metadata cache can significantly reduce the number of API calls to object storage and protects from linearly scale the number of these API calls with the number of querier and store-gateway instances (because the bucket is periodically scanned and synched by each querier and store-gateway).
@@ -391,8 +392,8 @@ blocks_storage:
# CLI flag: -blocks-storage.filesystem.dir
[dir: | default = ""]
- # This configures how the store-gateway synchronizes blocks stored in the
- # bucket.
+ # This configures how the querier and store-gateway discover and synchronize
+ # blocks stored in the bucket.
bucket_store:
# Directory to store synchronized TSDB index headers.
# CLI flag: -blocks-storage.bucket-store.sync-dir
@@ -629,7 +630,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-content-ttl
[metafile_content_ttl: | default = 24h]
- # Maximum size of metafile content to cache in bytes.
+ # Maximum size of metafile content to cache in bytes. Caching will be
+ # skipped if the content exceeds this size. This is useful to avoid
+ # network round trip for large content if the configured caching backend
+ # has an hard limit on cached items size (in this case, you should set
+ # this limit to the same limit in the caching backend).
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-max-size-bytes
[metafile_max_size_bytes: | default = 1048576]
@@ -637,6 +642,18 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-attributes-ttl
[metafile_attributes_ttl: | default = 168h]
+ # How long to cache content of the bucket index.
+ # CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl
+ [bucket_index_content_ttl: | default = 5m]
+
+ # Maximum size of bucket index content to cache in bytes. Caching will be
+ # skipped if the content exceeds this size. This is useful to avoid
+ # network round trip for large content if the configured caching backend
+ # has an hard limit on cached items size (in this case, you should set
+ # this limit to the same limit in the caching backend).
+ # CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
+ [bucket_index_max_size_bytes: | default = 1048576]
+
# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to
# ignore blocks that are marked for deletion with some delay. This ensures
@@ -646,6 +663,33 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: | default = 6h]
+ bucket_index:
+ # True to enable querier to discover blocks in the storage via bucket
+ # index instead of bucket scanning.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.enabled
+ [enabled: | default = false]
+
+ # How frequently a cached bucket index should be refreshed.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval
+ [update_on_stale_interval: | default = 15m]
+
+ # How frequently a bucket index, which previously failed to load, should
+ # be tried to load again.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
+ [update_on_error_interval: | default = 1m]
+
+ # How long a unused bucket index should be cached. Once this timeout
+ # expires, the unused bucket index is removed from the in-memory cache.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
+ [idle_timeout: | default = 1h]
+
+ # The maximum allowed age of a bucket index (last updated) before queries
+ # start failing because the bucket index is too old. The bucket index is
+ # periodically updated by the compactor, while this check is enforced in
+ # the querier (at query time).
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
+ [max_stale_period: | default = 1h]
+
tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
diff --git a/docs/blocks-storage/store-gateway.template b/docs/blocks-storage/store-gateway.template
index c2a123d71b6..49526b55820 100644
--- a/docs/blocks-storage/store-gateway.template
+++ b/docs/blocks-storage/store-gateway.template
@@ -125,6 +125,7 @@ Store-gateway and [querier](./querier.md) can use memcached for caching bucket m
- List of blocks per tenant
- Block's `meta.json` content
- Block's `deletion-mark.json` existence and content
+- Tenant's `bucket-index.json.gz` content
Using the metadata cache can significantly reduce the number of API calls to object storage and protects from linearly scale the number of these API calls with the number of querier and store-gateway instances (because the bucket is periodically scanned and synched by each querier and store-gateway).
diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md
index 47ee3eab8c2..2201a65c2cc 100644
--- a/docs/configuration/config-file-reference.md
+++ b/docs/configuration/config-file-reference.md
@@ -3621,8 +3621,8 @@ filesystem:
# CLI flag: -blocks-storage.filesystem.dir
[dir: | default = ""]
-# This configures how the store-gateway synchronizes blocks stored in the
-# bucket.
+# This configures how the querier and store-gateway discover and synchronize
+# blocks stored in the bucket.
bucket_store:
# Directory to store synchronized TSDB index headers.
# CLI flag: -blocks-storage.bucket-store.sync-dir
@@ -3858,7 +3858,11 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-content-ttl
[metafile_content_ttl: | default = 24h]
- # Maximum size of metafile content to cache in bytes.
+ # Maximum size of metafile content to cache in bytes. Caching will be
+ # skipped if the content exceeds this size. This is useful to avoid network
+ # round trip for large content if the configured caching backend has an hard
+ # limit on cached items size (in this case, you should set this limit to the
+ # same limit in the caching backend).
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-max-size-bytes
[metafile_max_size_bytes: | default = 1048576]
@@ -3866,6 +3870,18 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.metafile-attributes-ttl
[metafile_attributes_ttl: | default = 168h]
+ # How long to cache content of the bucket index.
+ # CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl
+ [bucket_index_content_ttl: | default = 5m]
+
+ # Maximum size of bucket index content to cache in bytes. Caching will be
+ # skipped if the content exceeds this size. This is useful to avoid network
+ # round trip for large content if the configured caching backend has an hard
+ # limit on cached items size (in this case, you should set this limit to the
+ # same limit in the caching backend).
+ # CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
+ [bucket_index_max_size_bytes: | default = 1048576]
+
# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to ignore
# blocks that are marked for deletion with some delay. This ensures store can
@@ -3875,6 +3891,33 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: | default = 6h]
+ bucket_index:
+ # True to enable querier to discover blocks in the storage via bucket index
+ # instead of bucket scanning.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.enabled
+ [enabled: | default = false]
+
+ # How frequently a cached bucket index should be refreshed.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-stale-interval
+ [update_on_stale_interval: | default = 15m]
+
+ # How frequently a bucket index, which previously failed to load, should be
+ # tried to load again.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
+ [update_on_error_interval: | default = 1m]
+
+ # How long a unused bucket index should be cached. Once this timeout
+ # expires, the unused bucket index is removed from the in-memory cache.
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
+ [idle_timeout: | default = 1h]
+
+ # The maximum allowed age of a bucket index (last updated) before queries
+ # start failing because the bucket index is too old. The bucket index is
+ # periodically updated by the compactor, while this check is enforced in the
+ # querier (at query time).
+ # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
+ [max_stale_period: | default = 1h]
+
tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md
index 920e475450b..6edcc4fb159 100644
--- a/docs/configuration/v1-guarantees.md
+++ b/docs/configuration/v1-guarantees.md
@@ -64,4 +64,5 @@ Currently experimental features are:
- Tenant Deletion in Purger, for blocks storage.
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
- Blocks storage bucket index
+ - The bucket index support in the querier (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
diff --git a/docs/proposals/blocks-storage-bucket-index.md b/docs/proposals/blocks-storage-bucket-index.md
index 7780d453515..d2cfdfae0b9 100644
--- a/docs/proposals/blocks-storage-bucket-index.md
+++ b/docs/proposals/blocks-storage-bucket-index.md
@@ -41,7 +41,7 @@ We believe the same technique described in this proposal could be applied to opt
We propose to introduce a per-tenant bucket index. The index is a single JSON file containing two main information: list of all completed (non partial) blocks in the bucket + list of all deletion marks. The bucket index is stored in the bucket within the tenant location (eg. `/user-1/bucket-index.json`) and is kept updated by the compactor.
-The querier, at query time, checks whether the bucket index for the tenant has already been loaded in memory. If not, the querier will download it and cache it in memory. Given it's a small file, we expect the lazy download of the bucket index to not significantly impact 1st query performances.
+The querier, at query time, checks whether the bucket index for the tenant has already been loaded in memory. If not, the querier will download it and cache it in memory. Given it's a small file, we expect the lazy download of the bucket index to not significantly impact first query performances.
While in-memory, a background process will keep it updated at periodic intervals (configurable), so that subsequent queries from the same tenant to the same querier instance will use the cached (and periodically updated) bucket index.
diff --git a/integration/e2e/metrics.go b/integration/e2e/metrics.go
index cdfa799e70b..18378fb447b 100644
--- a/integration/e2e/metrics.go
+++ b/integration/e2e/metrics.go
@@ -107,6 +107,16 @@ func Greater(value float64) func(sums ...float64) bool {
}
}
+// GreaterOrEqual is an isExpected function for WaitSumMetrics that returns true if given single sum is greater or equal than given value.
+func GreaterOrEqual(value float64) func(sums ...float64) bool {
+ return func(sums ...float64) bool {
+ if len(sums) != 1 {
+ panic("greater: expected one value")
+ }
+ return sums[0] >= value
+ }
+}
+
// Less is an isExpected function for WaitSumMetrics that returns true if given single sum is less than given value.
func Less(value float64) func(sums ...float64) bool {
return func(sums ...float64) bool {
diff --git a/integration/e2ecortex/services.go b/integration/e2ecortex/services.go
index 5654ef68a85..8e3103aab8c 100644
--- a/integration/e2ecortex/services.go
+++ b/integration/e2ecortex/services.go
@@ -241,6 +241,39 @@ func NewQuerySchedulerWithConfigFile(name, configFile string, flags map[string]s
)
}
+func NewCompactor(name string, consulAddress string, flags map[string]string, image string) *CortexService {
+ return NewCompactorWithConfigFile(name, consulAddress, "", flags, image)
+}
+
+func NewCompactorWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService {
+ if configFile != "" {
+ flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
+ }
+
+ if image == "" {
+ image = GetDefaultImage()
+ }
+
+ return NewCortexService(
+ name,
+ image,
+ e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
+ "-target": "compactor",
+ "-log.level": "warn",
+ // Store-gateway ring backend.
+ "-compactor.sharding-enabled": "true",
+ "-compactor.ring.store": "consul",
+ "-compactor.ring.consul.hostname": consulAddress,
+ // Startup quickly.
+ "-compactor.ring.wait-stability-min-duration": "0",
+ "-compactor.ring.wait-stability-max-duration": "0",
+ }, flags))...),
+ e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
+ httpPort,
+ grpcPort,
+ )
+}
+
func NewSingleBinary(name string, flags map[string]string, image string, otherPorts ...int) *CortexService {
if image == "" {
image = GetDefaultImage()
diff --git a/integration/querier_test.go b/integration/querier_test.go
index f177264a0d1..baef356dfce 100644
--- a/integration/querier_test.go
+++ b/integration/querier_test.go
@@ -30,6 +30,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
tenantShardSize int
ingesterStreamingEnabled bool
indexCacheBackend string
+ bucketIndexEnabled bool
}{
"blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": {
blocksShardingStrategy: "",
@@ -60,6 +61,19 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
},
+ "blocks default sharding, ingester gRPC streaming enabled, inmemory index cache, bucket index enabled": {
+ blocksShardingStrategy: "default",
+ ingesterStreamingEnabled: true,
+ indexCacheBackend: tsdb.IndexCacheBackendInMemory,
+ bucketIndexEnabled: true,
+ },
+ "blocks shuffle sharding, ingester gRPC streaming enabled, memcached index cache, bucket index enabled": {
+ blocksShardingStrategy: "shuffle-sharding",
+ tenantShardSize: 1,
+ ingesterStreamingEnabled: true,
+ indexCacheBackend: tsdb.IndexCacheBackendMemcached,
+ bucketIndexEnabled: true,
+ },
}
for testName, testCfg := range tests {
@@ -73,16 +87,17 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
- "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
- "-blocks-storage.tsdb.ship-interval": "1s",
- "-blocks-storage.bucket-store.sync-interval": "1s",
- "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
- "-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend,
- "-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""),
- "-store-gateway.sharding-strategy": testCfg.blocksShardingStrategy,
- "-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize),
- "-querier.ingester-streaming": strconv.FormatBool(testCfg.ingesterStreamingEnabled),
- "-querier.query-store-for-labels-enabled": "true",
+ "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
+ "-blocks-storage.tsdb.ship-interval": "1s",
+ "-blocks-storage.bucket-store.sync-interval": "1s",
+ "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
+ "-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend,
+ "-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""),
+ "-store-gateway.sharding-strategy": testCfg.blocksShardingStrategy,
+ "-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize),
+ "-querier.ingester-streaming": strconv.FormatBool(testCfg.ingesterStreamingEnabled),
+ "-querier.query-store-for-labels-enabled": "true",
+ "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
})
// Start dependencies.
@@ -158,8 +173,14 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(3), "cortex_ingester_memory_series_created_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total"))
- // Wait until the querier has discovered the uploaded blocks.
- require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_blocks_meta_synced"))
+ if testCfg.bucketIndexEnabled {
+ // Start the compactor to have the bucket index created before querying.
+ compactor := e2ecortex.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags, "")
+ require.NoError(t, s.StartAndWaitReady(compactor))
+ } else {
+ // Wait until the querier has discovered the uploaded blocks.
+ require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_blocks_meta_synced"))
+ }
// Wait until the store-gateway has synched the new uploaded blocks. When sharding is enabled
// we don't known which store-gateway instance will synch the blocks, so we need to wait on
@@ -239,6 +260,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
blocksShardingEnabled bool
ingesterStreamingEnabled bool
indexCacheBackend string
+ bucketIndexEnabled bool
}{
"blocks sharding enabled, ingester gRPC streaming disabled, inmemory index cache": {
blocksShardingEnabled: true,
@@ -263,6 +285,12 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
},
+ "blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache, bucket index enabled": {
+ blocksShardingEnabled: true,
+ ingesterStreamingEnabled: true,
+ indexCacheBackend: tsdb.IndexCacheBackendMemcached,
+ bucketIndexEnabled: true,
+ },
}
for testName, testCfg := range tests {
@@ -293,6 +321,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend,
"-blocks-storage.bucket-store.index-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
+ "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
"-querier.ingester-streaming": strconv.FormatBool(testCfg.ingesterStreamingEnabled),
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
@@ -361,17 +390,24 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(3*cluster.NumInstances())), "cortex_ingester_memory_series_created_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_memory_series_removed_total"))
- // Wait until the querier has discovered the uploaded blocks (discovered both by the querier and store-gateway).
- require.NoError(t, cluster.WaitSumMetricsWithOptions(e2e.Equals(float64(2*cluster.NumInstances()*2)), []string{"cortex_blocks_meta_synced"}, e2e.WithLabelMatchers(
- labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
+ if testCfg.bucketIndexEnabled {
+ // Start the compactor to have the bucket index created before querying. We need to run the compactor
+ // as a separate service because it's currently not part of the single binary.
+ compactor := e2ecortex.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags, "")
+ require.NoError(t, s.StartAndWaitReady(compactor))
+ } else {
+ // Wait until the querier has discovered the uploaded blocks (discovered both by the querier and store-gateway).
+ require.NoError(t, cluster.WaitSumMetricsWithOptions(e2e.Equals(float64(2*cluster.NumInstances()*2)), []string{"cortex_blocks_meta_synced"}, e2e.WithLabelMatchers(
+ labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
+ }
- // Wait until the store-gateway has synched the new uploaded blocks.
+ // Wait until the store-gateway has synched the new uploaded blocks. The number of blocks loaded
+ // may be greater than expected if the compactor is running (there may have been compacted).
const shippedBlocks = 2
-
if testCfg.blocksShardingEnabled {
- require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(shippedBlocks*seriesReplicationFactor)), "cortex_bucket_store_blocks_loaded"))
+ require.NoError(t, cluster.WaitSumMetrics(e2e.GreaterOrEqual(float64(shippedBlocks*seriesReplicationFactor)), "cortex_bucket_store_blocks_loaded"))
} else {
- require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(shippedBlocks*seriesReplicationFactor*cluster.NumInstances())), "cortex_bucket_store_blocks_loaded"))
+ require.NoError(t, cluster.WaitSumMetrics(e2e.GreaterOrEqual(float64(shippedBlocks*seriesReplicationFactor*cluster.NumInstances())), "cortex_bucket_store_blocks_loaded"))
}
// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go
new file mode 100644
index 00000000000..0aa6b175c35
--- /dev/null
+++ b/pkg/querier/blocks_finder_bucket_index.go
@@ -0,0 +1,108 @@
+package querier
+
+import (
+ "context"
+ "time"
+
+ "github.com/go-kit/kit/log"
+ "github.com/oklog/ulid"
+ "github.com/pkg/errors"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/thanos-io/thanos/pkg/objstore"
+
+ "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
+ "github.com/cortexproject/cortex/pkg/util/services"
+)
+
+var (
+ errBucketIndexBlocksFinderNotRunning = errors.New("bucket index blocks finder is not running")
+ errBucketIndexTooOld = errors.New("bucket index is too old and the last time it was updated exceeds the allowed max staleness")
+)
+
+type BucketIndexBlocksFinderConfig struct {
+ IndexLoader bucketindex.LoaderConfig
+ MaxStalePeriod time.Duration
+ IgnoreDeletionMarksDelay time.Duration
+}
+
+// BucketIndexBlocksFinder implements BlocksFinder interface and find blocks in the bucket
+// looking up the bucket index.
+type BucketIndexBlocksFinder struct {
+ services.Service
+
+ cfg BucketIndexBlocksFinderConfig
+ loader *bucketindex.Loader
+}
+
+func NewBucketIndexBlocksFinder(cfg BucketIndexBlocksFinderConfig, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer) *BucketIndexBlocksFinder {
+ loader := bucketindex.NewLoader(cfg.IndexLoader, bkt, logger, reg)
+
+ return &BucketIndexBlocksFinder{
+ cfg: cfg,
+ loader: loader,
+ Service: loader,
+ }
+}
+
+// GetBlocks implements BlocksFinder.
+func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
+ if f.State() != services.Running {
+ return nil, nil, errBucketIndexBlocksFinderNotRunning
+ }
+ if maxT < minT {
+ return nil, nil, errInvalidBlocksRange
+ }
+
+ // Get the bucket index for this user.
+ idx, err := f.loader.GetIndex(ctx, userID)
+ if errors.Is(err, bucketindex.ErrIndexNotFound) {
+ // This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet
+ // so the bucket index hasn't been created yet.
+ return nil, nil, nil
+ }
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Ensure the bucket index is not too old.
+ if time.Since(idx.GetUpdatedAt()) > f.cfg.MaxStalePeriod {
+ return nil, nil, errBucketIndexTooOld
+ }
+
+ var (
+ matchingBlocks = map[ulid.ULID]*bucketindex.Block{}
+ matchingDeletionMarks = map[ulid.ULID]*bucketindex.BlockDeletionMark{}
+ )
+
+ // Filter blocks containing samples within the range.
+ for _, block := range idx.Blocks {
+ if !block.Within(minT, maxT) {
+ continue
+ }
+
+ matchingBlocks[block.ID] = block
+ }
+
+ for _, mark := range idx.BlockDeletionMarks {
+ // Filter deletion marks by matching blocks only.
+ if _, ok := matchingBlocks[mark.ID]; !ok {
+ continue
+ }
+
+ // Exclude blocks marked for deletion. This is the same logic as Thanos IgnoreDeletionMarkFilter.
+ if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.cfg.IgnoreDeletionMarksDelay.Seconds() {
+ delete(matchingBlocks, mark.ID)
+ continue
+ }
+
+ matchingDeletionMarks[mark.ID] = mark
+ }
+
+ // Convert matching blocks into a list.
+ blocks := make(bucketindex.Blocks, 0, len(matchingBlocks))
+ for _, b := range matchingBlocks {
+ blocks = append(blocks, b)
+ }
+
+ return blocks, matchingDeletionMarks, nil
+}
diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go
new file mode 100644
index 00000000000..2de9804a5cb
--- /dev/null
+++ b/pkg/querier/blocks_finder_bucket_index_test.go
@@ -0,0 +1,220 @@
+package querier
+
+import (
+ "context"
+ "path"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/go-kit/kit/log"
+ "github.com/oklog/ulid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/thanos-io/thanos/pkg/objstore"
+
+ "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
+ "github.com/cortexproject/cortex/pkg/util/services"
+)
+
+func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) {
+ const userID = "user-1"
+
+ ctx := context.Background()
+ bkt, _ := prepareFilesystemBucket(t)
+
+ // Mock a bucket index.
+ block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 15}
+ block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil), MinTime: 12, MaxTime: 20}
+ block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil), MinTime: 20, MaxTime: 30}
+ block4 := &bucketindex.Block{ID: ulid.MustNew(4, nil), MinTime: 30, MaxTime: 40}
+ block5 := &bucketindex.Block{ID: ulid.MustNew(5, nil), MinTime: 30, MaxTime: 40} // Time range overlaps with block4, but this block deletion mark is above the threshold.
+ mark3 := &bucketindex.BlockDeletionMark{ID: block3.ID, DeletionTime: time.Now().Unix()}
+ mark5 := &bucketindex.BlockDeletionMark{ID: block5.ID, DeletionTime: time.Now().Add(-2 * time.Hour).Unix()}
+
+ require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, &bucketindex.Index{
+ Version: bucketindex.IndexVersion1,
+ Blocks: bucketindex.Blocks{block1, block2, block3, block4, block5},
+ BlockDeletionMarks: bucketindex.BlockDeletionMarks{mark3, mark5},
+ UpdatedAt: time.Now().Unix(),
+ }))
+
+ finder := prepareBucketIndexBlocksFinder(t, bkt)
+
+ tests := map[string]struct {
+ minT int64
+ maxT int64
+ expectedBlocks bucketindex.Blocks
+ expectedMarks map[ulid.ULID]*bucketindex.BlockDeletionMark
+ }{
+ "no matching block because the range is too low": {
+ minT: 0,
+ maxT: 5,
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{},
+ },
+ "no matching block because the range is too high": {
+ minT: 50,
+ maxT: 60,
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{},
+ },
+ "matching all blocks": {
+ minT: 0,
+ maxT: 60,
+ expectedBlocks: bucketindex.Blocks{block4, block3, block2, block1},
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{
+ block3.ID: mark3,
+ },
+ },
+ "query range starting at a block maxT": {
+ minT: block3.MaxTime,
+ maxT: 60,
+ expectedBlocks: bucketindex.Blocks{block4},
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{},
+ },
+ "query range ending at a block minT": {
+ minT: block3.MinTime,
+ maxT: block4.MinTime,
+ expectedBlocks: bucketindex.Blocks{block4, block3},
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{
+ block3.ID: mark3,
+ },
+ },
+ "query range within a single block": {
+ minT: block3.MinTime + 2,
+ maxT: block3.MaxTime - 2,
+ expectedBlocks: bucketindex.Blocks{block3},
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{
+ block3.ID: mark3,
+ },
+ },
+ "query range within multiple blocks": {
+ minT: 13,
+ maxT: 16,
+ expectedBlocks: bucketindex.Blocks{block2, block1},
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{},
+ },
+ "query range matching exactly a single block": {
+ minT: block3.MinTime,
+ maxT: block3.MaxTime - 1,
+ expectedBlocks: bucketindex.Blocks{block3},
+ expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{
+ block3.ID: mark3,
+ },
+ },
+ }
+
+ for testName, testData := range tests {
+ t.Run(testName, func(t *testing.T) {
+ blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, testData.minT, testData.maxT)
+ require.NoError(t, err)
+ require.ElementsMatch(t, testData.expectedBlocks, blocks)
+ require.Equal(t, testData.expectedMarks, deletionMarks)
+ })
+ }
+}
+
+func BenchmarkBucketIndexBlocksFinder_GetBlocks(b *testing.B) {
+ const (
+ numBlocks = 1000
+ numDeletionMarks = 100
+ userID = "user-1"
+ )
+
+ ctx := context.Background()
+ bkt, _ := prepareFilesystemBucket(b)
+
+ // Mock a bucket index.
+ idx := &bucketindex.Index{
+ Version: bucketindex.IndexVersion1,
+ UpdatedAt: time.Now().Unix(),
+ }
+
+ for i := 1; i <= numBlocks; i++ {
+ id := ulid.MustNew(uint64(i), nil)
+ minT := int64(i * 10)
+ maxT := int64((i + 1) * 10)
+ idx.Blocks = append(idx.Blocks, &bucketindex.Block{ID: id, MinTime: minT, MaxTime: maxT})
+ }
+ for i := 1; i <= numDeletionMarks; i++ {
+ id := ulid.MustNew(uint64(i), nil)
+ idx.BlockDeletionMarks = append(idx.BlockDeletionMarks, &bucketindex.BlockDeletionMark{ID: id, DeletionTime: time.Now().Unix()})
+ }
+ require.NoError(b, bucketindex.WriteIndex(ctx, bkt, userID, idx))
+ finder := prepareBucketIndexBlocksFinder(b, bkt)
+
+ b.ResetTimer()
+
+ for n := 0; n < b.N; n++ {
+ blocks, marks, err := finder.GetBlocks(ctx, userID, 100, 200)
+ if err != nil || len(blocks) != 11 || len(marks) != 11 {
+ b.Fail()
+ }
+ }
+}
+
+func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T) {
+ const userID = "user-1"
+
+ ctx := context.Background()
+ bkt, _ := prepareFilesystemBucket(t)
+ finder := prepareBucketIndexBlocksFinder(t, bkt)
+
+ blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, 10, 20)
+ require.NoError(t, err)
+ assert.Empty(t, blocks)
+ assert.Empty(t, deletionMarks)
+}
+
+func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T) {
+ const userID = "user-1"
+
+ ctx := context.Background()
+ bkt, _ := prepareFilesystemBucket(t)
+ finder := prepareBucketIndexBlocksFinder(t, bkt)
+
+ // Upload a corrupted bucket index.
+ require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!")))
+
+ _, _, err := finder.GetBlocks(ctx, userID, 10, 20)
+ require.Equal(t, bucketindex.ErrIndexCorrupted, err)
+}
+
+func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) {
+ const userID = "user-1"
+
+ ctx := context.Background()
+ bkt, _ := prepareFilesystemBucket(t)
+ finder := prepareBucketIndexBlocksFinder(t, bkt)
+
+ require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, &bucketindex.Index{
+ Version: bucketindex.IndexVersion1,
+ Blocks: bucketindex.Blocks{},
+ BlockDeletionMarks: bucketindex.BlockDeletionMarks{},
+ UpdatedAt: time.Now().Add(-2 * time.Hour).Unix(),
+ }))
+
+ _, _, err := finder.GetBlocks(ctx, userID, 10, 20)
+ require.Equal(t, errBucketIndexTooOld, err)
+}
+
+func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIndexBlocksFinder {
+ ctx := context.Background()
+ cfg := BucketIndexBlocksFinderConfig{
+ IndexLoader: bucketindex.LoaderConfig{
+ CheckInterval: time.Minute,
+ UpdateOnStaleInterval: time.Minute,
+ UpdateOnErrorInterval: time.Minute,
+ IdleTimeout: time.Minute,
+ },
+ MaxStalePeriod: time.Hour,
+ IgnoreDeletionMarksDelay: time.Hour,
+ }
+
+ finder := NewBucketIndexBlocksFinder(cfg, bkt, log.NewNopLogger(), nil)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, finder))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, finder))
+ })
+
+ return finder
+}
diff --git a/pkg/querier/blocks_scanner.go b/pkg/querier/blocks_finder_bucket_scan.go
similarity index 86%
rename from pkg/querier/blocks_scanner.go
rename to pkg/querier/blocks_finder_bucket_scan.go
index 5d2fb56083f..f75dc974e31 100644
--- a/pkg/querier/blocks_scanner.go
+++ b/pkg/querier/blocks_finder_bucket_scan.go
@@ -29,11 +29,11 @@ import (
)
var (
- errBlocksScannerNotRunning = errors.New("blocks scanner is not running")
- errInvalidBlocksRange = errors.New("invalid blocks time range")
+ errBucketScanBlocksFinderNotRunning = errors.New("bucket scan blocks finder is not running")
+ errInvalidBlocksRange = errors.New("invalid blocks time range")
)
-type BlocksScannerConfig struct {
+type BucketScanBlocksFinderConfig struct {
ScanInterval time.Duration
TenantsConcurrency int
MetasConcurrency int
@@ -42,10 +42,11 @@ type BlocksScannerConfig struct {
IgnoreDeletionMarksDelay time.Duration
}
-type BlocksScanner struct {
+// BucketScanBlocksFinder is a BlocksFinder implementation periodically scanning the bucket to discover blocks.
+type BucketScanBlocksFinder struct {
services.Service
- cfg BlocksScannerConfig
+ cfg BucketScanBlocksFinderConfig
logger log.Logger
bucketClient objstore.Bucket
fetchersMetrics *storegateway.MetadataFetcherMetrics
@@ -66,8 +67,8 @@ type BlocksScanner struct {
scanLastSuccess prometheus.Gauge
}
-func NewBlocksScanner(cfg BlocksScannerConfig, bucketClient objstore.Bucket, logger log.Logger, reg prometheus.Registerer) *BlocksScanner {
- d := &BlocksScanner{
+func NewBucketScanBlocksFinder(cfg BucketScanBlocksFinderConfig, bucketClient objstore.Bucket, logger log.Logger, reg prometheus.Registerer) *BucketScanBlocksFinder {
+ d := &BucketScanBlocksFinder{
cfg: cfg,
logger: logger,
bucketClient: bucketClient,
@@ -102,10 +103,10 @@ func NewBlocksScanner(cfg BlocksScannerConfig, bucketClient objstore.Bucket, log
// GetBlocks returns known blocks for userID containing samples within the range minT
// and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending.
-func (d *BlocksScanner) GetBlocks(_ context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
+func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
// We need to ensure the initial full bucket scan succeeded.
if d.State() != services.Running {
- return nil, nil, errBlocksScannerNotRunning
+ return nil, nil, errBucketScanBlocksFinderNotRunning
}
if maxT < minT {
return nil, nil, errInvalidBlocksRange
@@ -123,8 +124,7 @@ func (d *BlocksScanner) GetBlocks(_ context.Context, userID string, minT, maxT i
// to "now", we're going to find matching blocks iterating the list in reverse order.
var matchingMetas bucketindex.Blocks
for i := len(userMetas) - 1; i >= 0; i-- {
- // NOTE: Block intervals are half-open: [MinTime, MaxTime).
- if userMetas[i].MinTime <= maxT && minT < userMetas[i].MaxTime {
+ if userMetas[i].Within(minT, maxT) {
matchingMetas = append(matchingMetas, userMetas[i])
}
@@ -147,7 +147,7 @@ func (d *BlocksScanner) GetBlocks(_ context.Context, userID string, minT, maxT i
return matchingMetas, matchingDeletionMarks, nil
}
-func (d *BlocksScanner) starting(ctx context.Context) error {
+func (d *BucketScanBlocksFinder) starting(ctx context.Context) error {
// Before the service is in the running state it must have successfully
// complete the initial scan.
if err := d.scanBucket(ctx); err != nil {
@@ -158,7 +158,7 @@ func (d *BlocksScanner) starting(ctx context.Context) error {
return nil
}
-func (d *BlocksScanner) scan(ctx context.Context) error {
+func (d *BucketScanBlocksFinder) scan(ctx context.Context) error {
if err := d.scanBucket(ctx); err != nil {
level.Error(d.logger).Log("msg", "failed to scan bucket storage to find blocks", "err", err)
}
@@ -167,7 +167,7 @@ func (d *BlocksScanner) scan(ctx context.Context) error {
return nil
}
-func (d *BlocksScanner) scanBucket(ctx context.Context) (returnErr error) {
+func (d *BucketScanBlocksFinder) scanBucket(ctx context.Context) (returnErr error) {
defer func(start time.Time) {
d.scanDuration.Observe(time.Since(start).Seconds())
if returnErr == nil {
@@ -266,7 +266,7 @@ pushJobsLoop:
// scanUserBlocksWithRetries runs scanUserBlocks() retrying multiple times
// in case of error.
-func (d *BlocksScanner) scanUserBlocksWithRetries(ctx context.Context, userID string) (metas bucketindex.Blocks, deletionMarks map[ulid.ULID]*bucketindex.BlockDeletionMark, err error) {
+func (d *BucketScanBlocksFinder) scanUserBlocksWithRetries(ctx context.Context, userID string) (metas bucketindex.Blocks, deletionMarks map[ulid.ULID]*bucketindex.BlockDeletionMark, err error) {
retries := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: time.Second,
MaxBackoff: 30 * time.Second,
@@ -285,7 +285,7 @@ func (d *BlocksScanner) scanUserBlocksWithRetries(ctx context.Context, userID st
return
}
-func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
+func (d *BucketScanBlocksFinder) scanUserBlocks(ctx context.Context, userID string) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
fetcher, userBucket, deletionMarkFilter, err := d.getOrCreateMetaFetcher(userID)
if err != nil {
return nil, nil, errors.Wrapf(err, "create meta fetcher for user %s", userID)
@@ -327,7 +327,7 @@ func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) (buck
}
// The blocks scanner expects all blocks to be sorted by max time.
- sortBlockMetasByMaxTime(res)
+ sortBlocksByMaxTime(res)
// Convert deletion marks to our onw data type.
marks := map[ulid.ULID]*bucketindex.BlockDeletionMark{}
@@ -338,7 +338,7 @@ func (d *BlocksScanner) scanUserBlocks(ctx context.Context, userID string) (buck
return res, marks, nil
}
-func (d *BlocksScanner) getOrCreateMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) {
+func (d *BucketScanBlocksFinder) getOrCreateMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) {
d.fetchersMx.Lock()
defer d.fetchersMx.Unlock()
@@ -360,7 +360,7 @@ func (d *BlocksScanner) getOrCreateMetaFetcher(userID string) (block.MetadataFet
return fetcher, userBucket, deletionMarkFilter, nil
}
-func (d *BlocksScanner) createMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) {
+func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) {
userLogger := util.WithUserID(userID, d.logger)
userBucket := bucket.NewUserBucketClient(userID, d.bucketClient)
userReg := prometheus.NewRegistry()
@@ -392,7 +392,7 @@ func (d *BlocksScanner) createMetaFetcher(userID string) (block.MetadataFetcher,
return f, userBucket, deletionMarkFilter, nil
}
-func (d *BlocksScanner) getBlockMeta(userID string, blockID ulid.ULID) *bucketindex.Block {
+func (d *BucketScanBlocksFinder) getBlockMeta(userID string, blockID ulid.ULID) *bucketindex.Block {
d.userMx.RLock()
defer d.userMx.RUnlock()
@@ -404,7 +404,7 @@ func (d *BlocksScanner) getBlockMeta(userID string, blockID ulid.ULID) *bucketin
return metas[blockID]
}
-func sortBlockMetasByMaxTime(blocks bucketindex.Blocks) {
+func sortBlocksByMaxTime(blocks bucketindex.Blocks) {
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].MaxTime < blocks[j].MaxTime
})
diff --git a/pkg/querier/blocks_scanner_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go
similarity index 85%
rename from pkg/querier/blocks_scanner_test.go
rename to pkg/querier/blocks_finder_bucket_scan_test.go
index 5c115476c5d..a09f37fd143 100644
--- a/pkg/querier/blocks_scanner_test.go
+++ b/pkg/querier/blocks_finder_bucket_scan_test.go
@@ -29,10 +29,9 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
)
-func TestBlocksScanner_InitialScan(t *testing.T) {
+func TestBucketScanBlocksFinder_InitialScan(t *testing.T) {
ctx := context.Background()
- s, bucket, _, reg, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, reg := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
user1Block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
user1Block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
@@ -80,7 +79,7 @@ func TestBlocksScanner_InitialScan(t *testing.T) {
assert.Greater(t, testutil.ToFloat64(s.scanLastSuccess), float64(0))
}
-func TestBlocksScanner_InitialScanFailure(t *testing.T) {
+func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) {
cacheDir, err := ioutil.TempDir(os.TempDir(), "blocks-scanner-test-cache")
require.NoError(t, err)
defer os.RemoveAll(cacheDir) //nolint: errcheck
@@ -89,10 +88,10 @@ func TestBlocksScanner_InitialScanFailure(t *testing.T) {
bucket := &bucket.ClientMock{}
reg := prometheus.NewPedanticRegistry()
- cfg := prepareBlocksScannerConfig()
+ cfg := prepareBucketScanBlocksFinderConfig()
cfg.CacheDir = cacheDir
- s := NewBlocksScanner(cfg, bucket, log.NewNopLogger(), reg)
+ s := NewBucketScanBlocksFinder(cfg, bucket, log.NewNopLogger(), reg)
defer func() {
s.StopAsync()
s.AwaitTerminated(context.Background()) //nolint: errcheck
@@ -108,7 +107,7 @@ func TestBlocksScanner_InitialScanFailure(t *testing.T) {
require.Error(t, s.AwaitRunning(ctx))
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
- assert.Equal(t, errBlocksScannerNotRunning, err)
+ assert.Equal(t, errBucketScanBlocksFinderNotRunning, err)
assert.Nil(t, blocks)
assert.Nil(t, deletionMarks)
@@ -136,7 +135,7 @@ func TestBlocksScanner_InitialScanFailure(t *testing.T) {
))
}
-func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T) {
+func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T) {
tenantIDs := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
// Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket.
@@ -153,12 +152,12 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T)
require.NoError(t, err)
defer os.RemoveAll(cacheDir)
- cfg := prepareBlocksScannerConfig()
+ cfg := prepareBucketScanBlocksFinderConfig()
cfg.CacheDir = cacheDir
cfg.MetasConcurrency = 1
cfg.TenantsConcurrency = 1
- s := NewBlocksScanner(cfg, bucket, log.NewLogfmtLogger(os.Stdout), nil)
+ s := NewBucketScanBlocksFinder(cfg, bucket, log.NewLogfmtLogger(os.Stdout), nil)
// Start the scanner, let it run for 1s and then issue a stop.
require.NoError(t, s.StartAsync(context.Background()))
@@ -172,7 +171,7 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T)
assert.Less(t, time.Since(stopTime).Nanoseconds(), (3 * time.Second).Nanoseconds())
}
-func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T) {
+func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T) {
var blockPaths []string
for i := 1; i <= 10; i++ {
blockPaths = append(blockPaths, "user-1/"+ulid.MustNew(uint64(i), nil).String())
@@ -191,12 +190,12 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T)
require.NoError(t, err)
defer os.RemoveAll(cacheDir)
- cfg := prepareBlocksScannerConfig()
+ cfg := prepareBucketScanBlocksFinderConfig()
cfg.CacheDir = cacheDir
cfg.MetasConcurrency = 1
cfg.TenantsConcurrency = 1
- s := NewBlocksScanner(cfg, bucket, log.NewLogfmtLogger(os.Stdout), nil)
+ s := NewBucketScanBlocksFinder(cfg, bucket, log.NewLogfmtLogger(os.Stdout), nil)
// Start the scanner, let it run for 1s and then issue a stop.
require.NoError(t, s.StartAsync(context.Background()))
@@ -210,10 +209,9 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T)
assert.Less(t, time.Since(stopTime).Nanoseconds(), (3 * time.Second).Nanoseconds())
}
-func TestBlocksScanner_PeriodicScanFindsNewUser(t *testing.T) {
+func TestBucketScanBlocksFinder_PeriodicScanFindsNewUser(t *testing.T) {
ctx := context.Background()
- s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
@@ -241,10 +239,9 @@ func TestBlocksScanner_PeriodicScanFindsNewUser(t *testing.T) {
}, deletionMarks)
}
-func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) {
+func TestBucketScanBlocksFinder_PeriodicScanFindsNewBlock(t *testing.T) {
ctx := context.Background()
- s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
@@ -272,10 +269,9 @@ func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) {
assert.Empty(t, deletionMarks)
}
-func TestBlocksScanner_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) {
+func TestBucketScanBlocksFinder_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) {
ctx := context.Background()
- s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
@@ -304,10 +300,9 @@ func TestBlocksScanner_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) {
}, deletionMarks)
}
-func TestBlocksScanner_PeriodicScanFindsDeletedBlock(t *testing.T) {
+func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedBlock(t *testing.T) {
ctx := context.Background()
- s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
@@ -333,10 +328,9 @@ func TestBlocksScanner_PeriodicScanFindsDeletedBlock(t *testing.T) {
assert.Empty(t, deletionMarks)
}
-func TestBlocksScanner_PeriodicScanFindsDeletedUser(t *testing.T) {
+func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedUser(t *testing.T) {
ctx := context.Background()
- s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
@@ -361,10 +355,9 @@ func TestBlocksScanner_PeriodicScanFindsDeletedUser(t *testing.T) {
assert.Empty(t, deletionMarks)
}
-func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing.T) {
+func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing.T) {
ctx := context.Background()
- s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
@@ -400,10 +393,9 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing
assert.Empty(t, deletionMarks)
}
-func TestBlocksScanner_GetBlocks(t *testing.T) {
+func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) {
ctx := context.Background()
- s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
- defer cleanup()
+ s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 15)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 12, 20)
@@ -489,35 +481,46 @@ func TestBlocksScanner_GetBlocks(t *testing.T) {
}
}
-func prepareBlocksScanner(t *testing.T, cfg BlocksScannerConfig) (*BlocksScanner, objstore.Bucket, string, *prometheus.Registry, func()) {
+func prepareBucketScanBlocksFinder(t *testing.T, cfg BucketScanBlocksFinderConfig) (*BucketScanBlocksFinder, objstore.Bucket, string, *prometheus.Registry) {
cacheDir, err := ioutil.TempDir(os.TempDir(), "blocks-scanner-test-cache")
require.NoError(t, err)
+ t.Cleanup(func() {
+ require.NoError(t, os.RemoveAll(cacheDir))
+ })
- storageDir, err := ioutil.TempDir(os.TempDir(), "blocks-scanner-test-storage")
- require.NoError(t, err)
-
- bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
- require.NoError(t, err)
+ bkt, storageDir := prepareFilesystemBucket(t)
reg := prometheus.NewPedanticRegistry()
cfg.CacheDir = cacheDir
- s := NewBlocksScanner(cfg, bucket, log.NewNopLogger(), reg)
+ s := NewBucketScanBlocksFinder(cfg, bkt, log.NewNopLogger(), reg)
- cleanup := func() {
+ t.Cleanup(func() {
s.StopAsync()
- s.AwaitTerminated(context.Background()) //nolint: errcheck
- require.NoError(t, os.RemoveAll(cacheDir))
- require.NoError(t, os.RemoveAll(storageDir))
- }
+ require.NoError(t, s.AwaitTerminated(context.Background()))
+ })
- return s, bucket, storageDir, reg, cleanup
+ return s, bkt, storageDir, reg
}
-func prepareBlocksScannerConfig() BlocksScannerConfig {
- return BlocksScannerConfig{
+func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig {
+ return BucketScanBlocksFinderConfig{
ScanInterval: time.Minute,
TenantsConcurrency: 10,
MetasConcurrency: 10,
IgnoreDeletionMarksDelay: time.Hour,
}
}
+
+func prepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) {
+ storageDir, err := ioutil.TempDir(os.TempDir(), "bucket")
+ require.NoError(t, err)
+
+ t.Cleanup(func() {
+ require.NoError(t, os.RemoveAll(storageDir))
+ })
+
+ bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
+ require.NoError(t, err)
+
+ return bkt, storageDir
+}
diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go
index f60ec718227..a5fb5c913b3 100644
--- a/pkg/querier/blocks_store_queryable.go
+++ b/pkg/querier/blocks_store_queryable.go
@@ -159,20 +159,35 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
return nil, errors.Wrap(err, "failed to create bucket client")
}
- // Blocks scanner doesn't use chunks, but we pass config for consistency.
+ // Blocks finder doesn't use chunks, but we pass config for consistency.
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create caching bucket")
}
bucketClient = cachingBucket
- scanner := NewBlocksScanner(BlocksScannerConfig{
- ScanInterval: storageCfg.BucketStore.SyncInterval,
- TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency,
- MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency,
- CacheDir: storageCfg.BucketStore.SyncDir,
- IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
- }, bucketClient, logger, reg)
+ // Create the blocks finder.
+ var finder BlocksFinder
+ if storageCfg.BucketStore.BucketIndex.Enabled {
+ finder = NewBucketIndexBlocksFinder(BucketIndexBlocksFinderConfig{
+ IndexLoader: bucketindex.LoaderConfig{
+ CheckInterval: time.Minute,
+ UpdateOnStaleInterval: storageCfg.BucketStore.BucketIndex.UpdateOnStaleInterval,
+ UpdateOnErrorInterval: storageCfg.BucketStore.BucketIndex.UpdateOnErrorInterval,
+ IdleTimeout: storageCfg.BucketStore.BucketIndex.IdleTimeout,
+ },
+ MaxStalePeriod: storageCfg.BucketStore.BucketIndex.MaxStalePeriod,
+ IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
+ }, bucketClient, logger, reg)
+ } else {
+ finder = NewBucketScanBlocksFinder(BucketScanBlocksFinderConfig{
+ ScanInterval: storageCfg.BucketStore.SyncInterval,
+ TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency,
+ MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency,
+ CacheDir: storageCfg.BucketStore.SyncDir,
+ IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
+ }, bucketClient, logger, reg)
+ }
if gatewayCfg.ShardingEnabled {
storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig()
@@ -218,7 +233,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
reg,
)
- return NewBlocksStoreQueryable(stores, scanner, consistency, limits, querierCfg.QueryStoreAfter, logger, reg)
+ return NewBlocksStoreQueryable(stores, finder, consistency, limits, querierCfg.QueryStoreAfter, logger, reg)
}
func (q *BlocksStoreQueryable) starting(ctx context.Context) error {
diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go
index 3c26801b02d..6ca81430603 100644
--- a/pkg/storage/tsdb/bucketindex/index.go
+++ b/pkg/storage/tsdb/bucketindex/index.go
@@ -43,6 +43,10 @@ type Index struct {
UpdatedAt int64 `json:"updated_at"`
}
+func (idx *Index) GetUpdatedAt() time.Time {
+ return time.Unix(idx.UpdatedAt, 0)
+}
+
// RemoveBlock removes block and its deletion mark (if any) from index.
func (idx *Index) RemoveBlock(id ulid.ULID) {
for i := 0; i < len(idx.Blocks); i++ {
@@ -81,6 +85,13 @@ type Block struct {
UploadedAt int64 `json:"uploaded_at"`
}
+// Within returns whether the block contains samples within the provided range.
+// Input minT and maxT are both inclusive.
+func (m *Block) Within(minT, maxT int64) bool {
+ // NOTE: Block intervals are half-open: [MinTime, MaxTime).
+ return m.MinTime <= maxT && minT < m.MaxTime
+}
+
func (m *Block) GetUploadedAt() time.Time {
return time.Unix(m.UploadedAt, 0)
}
diff --git a/pkg/storage/tsdb/bucketindex/index_test.go b/pkg/storage/tsdb/bucketindex/index_test.go
index c67fc287ce8..4564e8859ca 100644
--- a/pkg/storage/tsdb/bucketindex/index_test.go
+++ b/pkg/storage/tsdb/bucketindex/index_test.go
@@ -210,6 +210,51 @@ func TestBlockFromThanosMeta(t *testing.T) {
}
}
+func TestBlock_Within(t *testing.T) {
+ tests := []struct {
+ block *Block
+ minT int64
+ maxT int64
+ expected bool
+ }{
+ {
+ block: &Block{MinTime: 10, MaxTime: 20},
+ minT: 5,
+ maxT: 9,
+ expected: false,
+ }, {
+ block: &Block{MinTime: 10, MaxTime: 20},
+ minT: 5,
+ maxT: 10,
+ expected: true,
+ }, {
+ block: &Block{MinTime: 10, MaxTime: 20},
+ minT: 5,
+ maxT: 10,
+ expected: true,
+ }, {
+ block: &Block{MinTime: 10, MaxTime: 20},
+ minT: 11,
+ maxT: 13,
+ expected: true,
+ }, {
+ block: &Block{MinTime: 10, MaxTime: 20},
+ minT: 19,
+ maxT: 21,
+ expected: true,
+ }, {
+ block: &Block{MinTime: 10, MaxTime: 20},
+ minT: 20,
+ maxT: 21,
+ expected: false,
+ },
+ }
+
+ for _, tc := range tests {
+ assert.Equal(t, tc.expected, tc.block.Within(tc.minT, tc.maxT))
+ }
+}
+
func TestBlock_ThanosMeta(t *testing.T) {
blockID := ulid.MustNew(1, nil)
userID := "user-1"
diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go
new file mode 100644
index 00000000000..2aff2184dd2
--- /dev/null
+++ b/pkg/storage/tsdb/bucketindex/loader.go
@@ -0,0 +1,275 @@
+package bucketindex
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/go-kit/kit/log"
+ "github.com/go-kit/kit/log/level"
+ "github.com/pkg/errors"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/thanos-io/thanos/pkg/objstore"
+ "go.uber.org/atomic"
+
+ "github.com/cortexproject/cortex/pkg/util"
+ "github.com/cortexproject/cortex/pkg/util/services"
+)
+
+const (
+ // readIndexTimeout is the maximum allowed time when reading a single bucket index
+ // from the storage. It's hard-coded to a reasonably high value.
+ readIndexTimeout = 15 * time.Second
+)
+
+type LoaderConfig struct {
+ CheckInterval time.Duration
+ UpdateOnStaleInterval time.Duration
+ UpdateOnErrorInterval time.Duration
+ IdleTimeout time.Duration
+}
+
+// Loader is responsible to lazy load bucket indexes and, once loaded for the first time,
+// keep them updated in background. Loaded indexes are automatically offloaded once the
+// idle timeout expires.
+type Loader struct {
+ services.Service
+
+ bkt objstore.Bucket
+ logger log.Logger
+ cfg LoaderConfig
+
+ indexesMx sync.RWMutex
+ indexes map[string]*cachedIndex
+
+ // Metrics.
+ loadAttempts prometheus.Counter
+ loadFailures prometheus.Counter
+ loadDuration prometheus.Histogram
+ loaded prometheus.GaugeFunc
+}
+
+// NewLoader makes a new Loader.
+func NewLoader(cfg LoaderConfig, bucketClient objstore.Bucket, logger log.Logger, reg prometheus.Registerer) *Loader {
+ l := &Loader{
+ bkt: bucketClient,
+ logger: logger,
+ cfg: cfg,
+ indexes: map[string]*cachedIndex{},
+
+ loadAttempts: promauto.With(reg).NewCounter(prometheus.CounterOpts{
+ Name: "cortex_bucket_index_loads_total",
+ Help: "Total number of bucket index loading attempts.",
+ }),
+ loadFailures: promauto.With(reg).NewCounter(prometheus.CounterOpts{
+ Name: "cortex_bucket_index_load_failures_total",
+ Help: "Total number of bucket index loading failures.",
+ }),
+ loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
+ Name: "cortex_bucket_index_load_duration_seconds",
+ Help: "Duration of the a single bucket index loading operation in seconds.",
+ Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.3, 1, 10},
+ }),
+ }
+
+ l.loaded = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
+ Name: "cortex_bucket_index_loaded",
+ Help: "Number of bucket indexes currently loaded in-memory.",
+ }, l.countLoadedIndexesMetric)
+
+ // Apply a jitter to the sync frequency in order to increase the probability
+ // of hitting the shared cache (if any).
+ checkInterval := util.DurationWithJitter(cfg.CheckInterval, 0.2)
+ l.Service = services.NewTimerService(checkInterval, nil, l.checkCachedIndexes, nil)
+
+ return l
+}
+
+// GetIndex returns the bucket index for the given user. It returns the in-memory cached
+// index if available, or load it from the bucket otherwise.
+func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
+ l.indexesMx.RLock()
+ if entry := l.indexes[userID]; entry != nil {
+ idx := entry.index
+ err := entry.err
+ l.indexesMx.RUnlock()
+
+ // We don't check if the index is stale because it's the responsibility
+ // of the background job to keep it updated.
+ entry.requestedAt.Store(time.Now().Unix())
+ return idx, err
+ }
+ l.indexesMx.RUnlock()
+
+ startTime := time.Now()
+ l.loadAttempts.Inc()
+ idx, err := ReadIndex(ctx, l.bkt, userID, l.logger)
+ if err != nil {
+ // Cache the error, to avoid hammering the object store in case of persistent issues
+ // (eg. corrupted bucket index or not existing).
+ l.cacheIndex(userID, nil, err)
+
+ l.loadFailures.Inc()
+ if !errors.Is(err, ErrIndexNotFound) {
+ level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err)
+ }
+
+ return nil, err
+ }
+
+ // Cache the index.
+ l.cacheIndex(userID, idx, nil)
+
+ elapsedTime := time.Since(startTime)
+ l.loadDuration.Observe(elapsedTime.Seconds())
+ level.Info(l.logger).Log("msg", "loaded bucket index", "user", userID, "duration", elapsedTime)
+ return idx, nil
+}
+
+func (l *Loader) cacheIndex(userID string, idx *Index, err error) {
+ l.indexesMx.Lock()
+ defer l.indexesMx.Unlock()
+
+ // Not an issue if, due to concurrency, another index was already cached
+ // and we overwrite it: last will win.
+ l.indexes[userID] = newCachedIndex(idx, err)
+}
+
+// checkCachedIndexes checks all cached indexes and, for each of them, does two things:
+// 1. Offload indexes not requested since >= idle timeout
+// 2. Update indexes which have been updated last time since >= update timeout
+func (l *Loader) checkCachedIndexes(ctx context.Context) error {
+ // Build a list of users for which we should update or delete the index.
+ toUpdate, toDelete := l.checkCachedIndexesToUpdateAndDelete()
+
+ // Delete unused indexes.
+ for _, userID := range toDelete {
+ l.deleteCachedIndex(userID)
+ }
+
+ // Update actively used indexes.
+ for _, userID := range toUpdate {
+ l.updateCachedIndex(ctx, userID)
+ }
+
+ // Never return error, otherwise the service terminates.
+ return nil
+}
+
+func (l *Loader) checkCachedIndexesToUpdateAndDelete() (toUpdate, toDelete []string) {
+ now := time.Now()
+
+ l.indexesMx.RLock()
+ defer l.indexesMx.RUnlock()
+
+ for userID, entry := range l.indexes {
+ switch {
+ case now.Sub(entry.getRequestedAt()) >= l.cfg.IdleTimeout:
+ toDelete = append(toDelete, userID)
+ case entry.err != nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnErrorInterval:
+ toUpdate = append(toUpdate, userID)
+ case entry.err == nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnStaleInterval:
+ toUpdate = append(toUpdate, userID)
+ }
+ }
+
+ return
+}
+
+func (l *Loader) updateCachedIndex(ctx context.Context, userID string) {
+ readCtx, cancel := context.WithTimeout(ctx, readIndexTimeout)
+ defer cancel()
+
+ l.loadAttempts.Inc()
+ startTime := time.Now()
+ idx, err := ReadIndex(readCtx, l.bkt, userID, l.logger)
+
+ if errors.Is(err, ErrIndexNotFound) {
+ level.Info(l.logger).Log("msg", "unloaded bucket index", "user", userID, "reason", "not found during periodic check")
+
+ // Remove from cache.
+ l.indexesMx.Lock()
+ delete(l.indexes, userID)
+ l.indexesMx.Unlock()
+
+ return
+ }
+ if err != nil {
+ l.loadFailures.Inc()
+ level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err)
+ return
+ }
+
+ l.loadDuration.Observe(time.Since(startTime).Seconds())
+
+ // Cache it.
+ l.indexesMx.Lock()
+ l.indexes[userID].index = idx
+ l.indexes[userID].err = nil
+ l.indexes[userID].setUpdatedAt(startTime)
+ l.indexesMx.Unlock()
+}
+
+func (l *Loader) deleteCachedIndex(userID string) {
+ l.indexesMx.Lock()
+ delete(l.indexes, userID)
+ l.indexesMx.Unlock()
+
+ level.Info(l.logger).Log("msg", "unloaded bucket index", "user", userID, "reason", "idle")
+}
+
+func (l *Loader) countLoadedIndexesMetric() float64 {
+ l.indexesMx.RLock()
+ defer l.indexesMx.RUnlock()
+
+ count := 0
+ for _, idx := range l.indexes {
+ if idx.index != nil {
+ count++
+ }
+ }
+ return float64(count)
+}
+
+type cachedIndex struct {
+ // We cache either the index or the error occurred while fetching it. They're
+ // mutually exclusive.
+ index *Index
+ err error
+
+ // Unix timestamp (seconds) of when the index has been updated from the storage the last time.
+ updatedAt atomic.Int64
+
+ // Unix timestamp (seconds) of when the index has been requested the last time.
+ requestedAt atomic.Int64
+}
+
+func newCachedIndex(idx *Index, err error) *cachedIndex {
+ entry := &cachedIndex{
+ index: idx,
+ err: err,
+ }
+
+ now := time.Now()
+ entry.setUpdatedAt(now)
+ entry.setRequestedAt(now)
+
+ return entry
+}
+
+func (i *cachedIndex) setUpdatedAt(ts time.Time) {
+ i.updatedAt.Store(ts.Unix())
+}
+
+func (i *cachedIndex) getUpdatedAt() time.Time {
+ return time.Unix(i.updatedAt.Load(), 0)
+}
+
+func (i *cachedIndex) setRequestedAt(ts time.Time) {
+ i.requestedAt.Store(ts.Unix())
+}
+
+func (i *cachedIndex) getRequestedAt() time.Time {
+ return time.Unix(i.requestedAt.Load(), 0)
+}
diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go
new file mode 100644
index 00000000000..96490609086
--- /dev/null
+++ b/pkg/storage/tsdb/bucketindex/loader_test.go
@@ -0,0 +1,426 @@
+package bucketindex
+
+import (
+ "bytes"
+ "context"
+ "path"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/go-kit/kit/log"
+ "github.com/oklog/ulid"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/cortexproject/cortex/pkg/util/services"
+ "github.com/cortexproject/cortex/pkg/util/test"
+)
+
+func TestLoader_GetIndex_ShouldLazyLoadBucketIndex(t *testing.T) {
+ ctx := context.Background()
+ reg := prometheus.NewPedanticRegistry()
+ bkt := prepareFilesystemBucket(t)
+
+ // Create a bucket index.
+ idx := &Index{
+ Version: IndexVersion1,
+ Blocks: Blocks{
+ {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
+ },
+ BlockDeletionMarks: nil,
+ UpdatedAt: time.Now().Unix(),
+ }
+ require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
+
+ // Create the loader.
+ loader := NewLoader(prepareLoaderConfig(), bkt, log.NewNopLogger(), reg)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
+ })
+
+ // Ensure no index has been loaded yet.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
+ # TYPE cortex_bucket_index_load_failures_total counter
+ cortex_bucket_index_load_failures_total 0
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 0
+ # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
+ # TYPE cortex_bucket_index_loads_total counter
+ cortex_bucket_index_loads_total 0
+ `),
+ "cortex_bucket_index_loads_total",
+ "cortex_bucket_index_load_failures_total",
+ "cortex_bucket_index_loaded",
+ ))
+
+ // Request the index multiple times.
+ for i := 0; i < 10; i++ {
+ actualIdx, err := loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+ }
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
+ # TYPE cortex_bucket_index_load_failures_total counter
+ cortex_bucket_index_load_failures_total 0
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 1
+ # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
+ # TYPE cortex_bucket_index_loads_total counter
+ cortex_bucket_index_loads_total 1
+ `),
+ "cortex_bucket_index_loads_total",
+ "cortex_bucket_index_load_failures_total",
+ "cortex_bucket_index_loaded",
+ ))
+}
+
+func TestLoader_GetIndex_ShouldCacheError(t *testing.T) {
+ ctx := context.Background()
+ reg := prometheus.NewPedanticRegistry()
+ bkt := prepareFilesystemBucket(t)
+
+ // Create the loader.
+ loader := NewLoader(prepareLoaderConfig(), bkt, log.NewNopLogger(), reg)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
+ })
+
+ // Request the index multiple times.
+ for i := 0; i < 10; i++ {
+ _, err := loader.GetIndex(ctx, "user-1")
+ require.Equal(t, ErrIndexNotFound, err)
+ }
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
+ # TYPE cortex_bucket_index_load_failures_total counter
+ cortex_bucket_index_load_failures_total 1
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 0
+ # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
+ # TYPE cortex_bucket_index_loads_total counter
+ cortex_bucket_index_loads_total 1
+ `),
+ "cortex_bucket_index_loads_total",
+ "cortex_bucket_index_load_failures_total",
+ "cortex_bucket_index_loaded",
+ ))
+}
+
+func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) {
+ ctx := context.Background()
+ reg := prometheus.NewPedanticRegistry()
+ bkt := prepareFilesystemBucket(t)
+
+ // Create a bucket index.
+ idx := &Index{
+ Version: IndexVersion1,
+ Blocks: Blocks{
+ {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
+ },
+ BlockDeletionMarks: nil,
+ UpdatedAt: time.Now().Unix(),
+ }
+ require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
+
+ // Create the loader.
+ cfg := LoaderConfig{
+ CheckInterval: time.Second,
+ UpdateOnStaleInterval: time.Second,
+ UpdateOnErrorInterval: time.Hour, // Intentionally high to not hit it.
+ IdleTimeout: time.Hour, // Intentionally high to not hit it.
+ }
+
+ loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
+ })
+
+ actualIdx, err := loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Update the bucket index.
+ idx.Blocks = append(idx.Blocks, &Block{ID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 30})
+ require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
+
+ // Wait until the index has been updated in background.
+ test.Poll(t, 3*time.Second, 2, func() interface{} {
+ actualIdx, err := loader.GetIndex(ctx, "user-1")
+ if err != nil {
+ return 0
+ }
+ return len(actualIdx.Blocks)
+ })
+
+ actualIdx, err = loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
+ # TYPE cortex_bucket_index_load_failures_total counter
+ cortex_bucket_index_load_failures_total 0
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 1
+ `),
+ "cortex_bucket_index_load_failures_total",
+ "cortex_bucket_index_loaded",
+ ))
+}
+
+func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) {
+ ctx := context.Background()
+ reg := prometheus.NewPedanticRegistry()
+ bkt := prepareFilesystemBucket(t)
+
+ // Create the loader.
+ cfg := LoaderConfig{
+ CheckInterval: time.Second,
+ UpdateOnStaleInterval: time.Hour, // Intentionally high to not hit it.
+ UpdateOnErrorInterval: time.Second,
+ IdleTimeout: time.Hour, // Intentionally high to not hit it.
+ }
+
+ loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
+ })
+
+ _, err := loader.GetIndex(ctx, "user-1")
+ assert.Equal(t, ErrIndexNotFound, err)
+
+ // Upload the bucket index.
+ idx := &Index{
+ Version: IndexVersion1,
+ Blocks: Blocks{
+ {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
+ },
+ BlockDeletionMarks: nil,
+ UpdatedAt: time.Now().Unix(),
+ }
+ require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
+
+ // Wait until the index has been updated in background.
+ test.Poll(t, 3*time.Second, nil, func() interface{} {
+ _, err := loader.GetIndex(ctx, "user-1")
+ return err
+ })
+
+ actualIdx, err := loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 1
+ `),
+ "cortex_bucket_index_loaded",
+ ))
+}
+
+func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) {
+ ctx := context.Background()
+ reg := prometheus.NewPedanticRegistry()
+ bkt := prepareFilesystemBucket(t)
+
+ // Create a bucket index.
+ idx := &Index{
+ Version: IndexVersion1,
+ Blocks: Blocks{
+ {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
+ },
+ BlockDeletionMarks: nil,
+ UpdatedAt: time.Now().Unix(),
+ }
+ require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
+
+ // Create the loader.
+ cfg := LoaderConfig{
+ CheckInterval: time.Second,
+ UpdateOnStaleInterval: time.Second,
+ UpdateOnErrorInterval: time.Second,
+ IdleTimeout: time.Hour, // Intentionally high to not hit it.
+ }
+
+ loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
+ })
+
+ actualIdx, err := loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Write a corrupted index.
+ require.NoError(t, bkt.Upload(ctx, path.Join("user-1", IndexCompressedFilename), strings.NewReader("invalid!}")))
+
+ // Wait until the first failure has been tracked.
+ test.Poll(t, 3*time.Second, true, func() interface{} {
+ return testutil.ToFloat64(loader.loadFailures) > 0
+ })
+
+ actualIdx, err = loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 1
+ `),
+ "cortex_bucket_index_loaded",
+ ))
+}
+
+func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T) {
+ ctx := context.Background()
+ reg := prometheus.NewPedanticRegistry()
+ bkt := prepareFilesystemBucket(t)
+
+ // Create a bucket index.
+ idx := &Index{
+ Version: IndexVersion1,
+ Blocks: Blocks{
+ {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
+ },
+ BlockDeletionMarks: nil,
+ UpdatedAt: time.Now().Unix(),
+ }
+ require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
+
+ // Create the loader.
+ cfg := LoaderConfig{
+ CheckInterval: time.Second,
+ UpdateOnStaleInterval: time.Second,
+ UpdateOnErrorInterval: time.Second,
+ IdleTimeout: time.Hour, // Intentionally high to not hit it.
+ }
+
+ loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
+ })
+
+ actualIdx, err := loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Delete the index
+ require.NoError(t, DeleteIndex(ctx, bkt, "user-1"))
+
+ // Wait until the index is offloaded.
+ test.Poll(t, 3*time.Second, float64(0), func() interface{} {
+ return testutil.ToFloat64(loader.loaded)
+ })
+
+ _, err = loader.GetIndex(ctx, "user-1")
+ require.Equal(t, ErrIndexNotFound, err)
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 0
+ `),
+ "cortex_bucket_index_loaded",
+ ))
+}
+
+func TestLoader_ShouldOffloadIndexIfIdleTimeoutIsReachedDuringBackgroundUpdates(t *testing.T) {
+ ctx := context.Background()
+ reg := prometheus.NewPedanticRegistry()
+ bkt := prepareFilesystemBucket(t)
+
+ // Create a bucket index.
+ idx := &Index{
+ Version: IndexVersion1,
+ Blocks: Blocks{
+ {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
+ },
+ BlockDeletionMarks: nil,
+ UpdatedAt: time.Now().Unix(),
+ }
+ require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
+
+ // Create the loader.
+ cfg := LoaderConfig{
+ CheckInterval: time.Second,
+ UpdateOnStaleInterval: time.Second,
+ UpdateOnErrorInterval: time.Second,
+ IdleTimeout: 0, // Offload at first check.
+ }
+
+ loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
+ require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
+ t.Cleanup(func() {
+ require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
+ })
+
+ actualIdx, err := loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Wait until the index is offloaded.
+ test.Poll(t, 3*time.Second, float64(0), func() interface{} {
+ return testutil.ToFloat64(loader.loaded)
+ })
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
+ # TYPE cortex_bucket_index_loaded gauge
+ cortex_bucket_index_loaded 0
+ # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
+ # TYPE cortex_bucket_index_loads_total counter
+ cortex_bucket_index_loads_total 1
+ `),
+ "cortex_bucket_index_loaded",
+ "cortex_bucket_index_loads_total",
+ ))
+
+ // Load it again.
+ actualIdx, err = loader.GetIndex(ctx, "user-1")
+ require.NoError(t, err)
+ assert.Equal(t, idx, actualIdx)
+
+ // Ensure metrics have been updated accordingly.
+ assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
+ # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
+ # TYPE cortex_bucket_index_loads_total counter
+ cortex_bucket_index_loads_total 2
+ `),
+ "cortex_bucket_index_loads_total",
+ ))
+}
+
+func prepareLoaderConfig() LoaderConfig {
+ return LoaderConfig{
+ CheckInterval: time.Minute,
+ UpdateOnStaleInterval: 15 * time.Minute,
+ UpdateOnErrorInterval: time.Minute,
+ IdleTimeout: time.Hour,
+ }
+}
diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go
index c4550ec899a..66f22fa11ac 100644
--- a/pkg/storage/tsdb/caching_bucket.go
+++ b/pkg/storage/tsdb/caching_bucket.go
@@ -77,6 +77,8 @@ type MetadataCacheConfig struct {
MetafileContentTTL time.Duration `yaml:"metafile_content_ttl"`
MetafileMaxSize int `yaml:"metafile_max_size_bytes"`
MetafileAttributesTTL time.Duration `yaml:"metafile_attributes_ttl"`
+ BucketIndexContentTTL time.Duration `yaml:"bucket_index_content_ttl"`
+ BucketIndexMaxSize int `yaml:"bucket_index_max_size_bytes"`
}
func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
@@ -90,8 +92,10 @@ func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix
f.DurationVar(&cfg.MetafileExistsTTL, prefix+"metafile-exists-ttl", 2*time.Hour, "How long to cache information that block metafile exists. Also used for user deletion mark file.")
f.DurationVar(&cfg.MetafileDoesntExistTTL, prefix+"metafile-doesnt-exist-ttl", 5*time.Minute, "How long to cache information that block metafile doesn't exist. Also used for user deletion mark file.")
f.DurationVar(&cfg.MetafileContentTTL, prefix+"metafile-content-ttl", 24*time.Hour, "How long to cache content of the metafile.")
- f.IntVar(&cfg.MetafileMaxSize, prefix+"metafile-max-size-bytes", 1*1024*1024, "Maximum size of metafile content to cache in bytes.")
+ f.IntVar(&cfg.MetafileMaxSize, prefix+"metafile-max-size-bytes", 1*1024*1024, "Maximum size of metafile content to cache in bytes. Caching will be skipped if the content exceeds this size. This is useful to avoid network round trip for large content if the configured caching backend has an hard limit on cached items size (in this case, you should set this limit to the same limit in the caching backend).")
f.DurationVar(&cfg.MetafileAttributesTTL, prefix+"metafile-attributes-ttl", 168*time.Hour, "How long to cache attributes of the block metafile.")
+ f.DurationVar(&cfg.BucketIndexContentTTL, prefix+"bucket-index-content-ttl", 5*time.Minute, "How long to cache content of the bucket index.")
+ f.IntVar(&cfg.BucketIndexMaxSize, prefix+"bucket-index-max-size-bytes", 1*1024*1024, "Maximum size of bucket index content to cache in bytes. Caching will be skipped if the content exceeds this size. This is useful to avoid network round trip for large content if the configured caching backend has an hard limit on cached items size (in this case, you should set this limit to the same limit in the caching backend).")
}
func (cfg *MetadataCacheConfig) Validate() error {
@@ -123,6 +127,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
cfg.CacheExists("metafile", metadataCache, isMetaFile, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL)
+ cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFile, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)
codec := snappyIterCodec{storecache.JSONIterCodec{}}
cfg.CacheIter("tenants-iter", metadataCache, isTenantsDir, metadataConfig.TenantsListTTL, codec)
@@ -165,6 +170,11 @@ func isMetaFile(name string) bool {
return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkPath)
}
+func isBucketIndexFile(name string) bool {
+ // TODO can't reference bucketindex because of a circular dependency. To be fixed.
+ return strings.HasSuffix(name, "/bucket-index.json.gz")
+}
+
func isTenantsDir(name string) bool {
return name == ""
}
diff --git a/pkg/storage/tsdb/caching_bucket_test.go b/pkg/storage/tsdb/caching_bucket_test.go
index 8f823b24098..41d5cf49b66 100644
--- a/pkg/storage/tsdb/caching_bucket_test.go
+++ b/pkg/storage/tsdb/caching_bucket_test.go
@@ -13,3 +13,11 @@ func TestIsTenantDir(t *testing.T) {
assert.False(t, isTenantBlocksDir("test/block"))
assert.False(t, isTenantBlocksDir("test/block/chunks"))
}
+
+func TestIsBucketIndexFile(t *testing.T) {
+ assert.False(t, isBucketIndexFile(""))
+ assert.False(t, isBucketIndexFile("test"))
+ assert.False(t, isBucketIndexFile("test/block"))
+ assert.False(t, isBucketIndexFile("test/block/chunks"))
+ assert.True(t, isBucketIndexFile("test/bucket-index.json.gz"))
+}
diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go
index 5105a79d716..2f721f28e8e 100644
--- a/pkg/storage/tsdb/config.go
+++ b/pkg/storage/tsdb/config.go
@@ -50,7 +50,7 @@ var (
//nolint:golint
type BlocksStorageConfig struct {
Bucket bucket.Config `yaml:",inline"`
- BucketStore BucketStoreConfig `yaml:"bucket_store" doc:"description=This configures how the store-gateway synchronizes blocks stored in the bucket."`
+ BucketStore BucketStoreConfig `yaml:"bucket_store" doc:"description=This configures how the querier and store-gateway discover and synchronize blocks stored in the bucket."`
TSDB TSDBConfig `yaml:"tsdb"`
}
@@ -206,7 +206,7 @@ func (cfg *TSDBConfig) BlocksDir(userID string) string {
return filepath.Join(cfg.Dir, userID)
}
-// BucketStoreConfig holds the config information for Bucket Stores used by the querier
+// BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway.
type BucketStoreConfig struct {
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval"`
@@ -220,6 +220,7 @@ type BucketStoreConfig struct {
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
+ BucketIndex BucketIndexConfig `yaml:"bucket_index"`
// Controls whether index-header lazy loading is enabled. This config option is hidden
// while it is marked as experimental.
@@ -239,6 +240,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.IndexCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-cache.")
cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.")
cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.")
+ cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.")
f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.")
f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.")
@@ -272,3 +274,19 @@ func (cfg *BucketStoreConfig) Validate() error {
}
return nil
}
+
+type BucketIndexConfig struct {
+ Enabled bool `yaml:"enabled"`
+ UpdateOnStaleInterval time.Duration `yaml:"update_on_stale_interval"`
+ UpdateOnErrorInterval time.Duration `yaml:"update_on_error_interval"`
+ IdleTimeout time.Duration `yaml:"idle_timeout"`
+ MaxStalePeriod time.Duration `yaml:"max_stale_period"`
+}
+
+func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
+ f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "True to enable querier to discover blocks in the storage via bucket index instead of bucket scanning.")
+ f.DurationVar(&cfg.UpdateOnStaleInterval, prefix+"update-on-stale-interval", 15*time.Minute, "How frequently a cached bucket index should be refreshed.")
+ f.DurationVar(&cfg.UpdateOnErrorInterval, prefix+"update-on-error-interval", time.Minute, "How frequently a bucket index, which previously failed to load, should be tried to load again.")
+ f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache.")
+ f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, while this check is enforced in the querier (at query time).")
+}
diff --git a/website/static/images/blocks-storage/bucket-index-querier-logic.png b/website/static/images/blocks-storage/bucket-index-querier-logic.png
new file mode 100644
index 00000000000..baaa2865714
Binary files /dev/null and b/website/static/images/blocks-storage/bucket-index-querier-logic.png differ