Skip to content

Commit d99a95e

Browse files
committed
add parquet labels cache
Signed-off-by: yeya24 <[email protected]>
1 parent e551a2e commit d99a95e

File tree

6 files changed

+90
-21
lines changed

6 files changed

+90
-21
lines changed

integration/parquet_querier_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/thanos-io/thanos/pkg/block/metadata"
2121

2222
"github.com/cortexproject/cortex/integration/e2e"
23+
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
2324
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
2425
"github.com/cortexproject/cortex/integration/e2ecortex"
2526
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -35,7 +36,8 @@ func TestParquetFuzz(t *testing.T) {
3536
defer s.Close()
3637

3738
consul := e2edb.NewConsulWithName("consul")
38-
require.NoError(t, s.StartAndWaitReady(consul))
39+
memcached := e2ecache.NewMemcached()
40+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
3941

4042
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
4143
flags := mergeFlags(
@@ -72,6 +74,11 @@ func TestParquetFuzz(t *testing.T) {
7274
"-parquet-converter.enabled": "true",
7375
// Querier
7476
"-querier.enable-parquet-queryable": "true",
77+
// Enable cache for parquet labels and chunks
78+
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
79+
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
80+
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
81+
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
7582
},
7683
)
7784

pkg/querier/bucket.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func createCachingBucketClient(ctx context.Context, storageCfg cortex_tsdb.Block
2222

2323
// Blocks finder doesn't use chunks, but we pass config for consistency.
2424
matchers := cortex_tsdb.NewMatchers()
25-
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg))
25+
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, storageCfg.BucketStore.ParquetLabelsCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg))
2626
if err != nil {
2727
return nil, errors.Wrap(err, "create caching bucket")
2828
}

pkg/storage/tsdb/caching_bucket.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,39 @@ func (cfg *MetadataCacheConfig) Validate() error {
186186
return cfg.BucketCacheBackend.Validate()
187187
}
188188

189-
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
189+
type ParquetLabelsCacheConfig struct {
190+
BucketCacheBackend `yaml:",inline"`
191+
192+
SubrangeSize int64 `yaml:"subrange_size"`
193+
MaxGetRangeRequests int `yaml:"max_get_range_requests"`
194+
AttributesTTL time.Duration `yaml:"attributes_ttl"`
195+
SubrangeTTL time.Duration `yaml:"subrange_ttl"`
196+
}
197+
198+
func (cfg *ParquetLabelsCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
199+
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The parquet labels cache backend type. Single or Multiple cache backend can be provided. "+
200+
"Supported values in single cache: %s, %s, %s, and '' (disable). "+
201+
"Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedBucketCacheBackends, ", ")))
202+
203+
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
204+
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
205+
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.", "parquet-labels")
206+
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")
207+
208+
f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
209+
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching parquet labels file. Zero or negative value = unlimited number of sub-requests.")
210+
f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for parquet labels file.")
211+
f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual subranges.")
212+
213+
// In the multi level parquet labels cache, backfill TTL follows subrange TTL
214+
cfg.MultiLevel.BackFillTTL = cfg.SubrangeTTL
215+
}
216+
217+
func (cfg *ParquetLabelsCacheConfig) Validate() error {
218+
return cfg.BucketCacheBackend.Validate()
219+
}
220+
221+
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, parquetLabelsConfig ParquetLabelsCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
190222
cfg := cache.NewCachingBucketConfig()
191223
cachingConfigured := false
192224

@@ -221,6 +253,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
221253
cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec, "")
222254
}
223255

256+
parquetLabelsCache, err := createBucketCache("parquet-labels-cache", &parquetLabelsConfig.BucketCacheBackend, logger, reg)
257+
if err != nil {
258+
return nil, errors.Wrapf(err, "parquet-labels-cache")
259+
}
260+
if parquetLabelsCache != nil {
261+
cachingConfigured = true
262+
parquetLabelsCache = cache.NewTracingCache(parquetLabelsCache)
263+
cfg.CacheGetRange("parquet-labels", parquetLabelsCache, matchers.GetParquetLabelsMatcher(), parquetLabelsConfig.SubrangeSize, parquetLabelsConfig.AttributesTTL, parquetLabelsConfig.SubrangeTTL, parquetLabelsConfig.MaxGetRangeRequests)
264+
}
265+
224266
if !cachingConfigured {
225267
// No caching is configured.
226268
return bkt, nil
@@ -316,6 +358,7 @@ func NewMatchers() Matchers {
316358
matcherMap := make(map[string]func(string) bool)
317359
matcherMap["chunks"] = isTSDBChunkFile
318360
matcherMap["parquet-chunks"] = isParquetChunkFile
361+
matcherMap["parquet-labels"] = isParquetLabelsFile
319362
matcherMap["metafile"] = isMetaFile
320363
matcherMap["block-index"] = isBlockIndexFile
321364
matcherMap["bucket-index"] = isBucketIndexFiles
@@ -339,6 +382,10 @@ func (m *Matchers) SetParquetChunksMatcher(f func(string) bool) {
339382
m.matcherMap["parquet-chunks"] = f
340383
}
341384

385+
func (m *Matchers) SetParquetLabelsMatcher(f func(string) bool) {
386+
m.matcherMap["parquet-labels"] = f
387+
}
388+
342389
func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) {
343390
m.matcherMap["block-index"] = f
344391
}
@@ -367,6 +414,10 @@ func (m *Matchers) GetParquetChunksMatcher() func(string) bool {
367414
return m.matcherMap["parquet-chunks"]
368415
}
369416

417+
func (m *Matchers) GetParquetLabelsMatcher() func(string) bool {
418+
return m.matcherMap["parquet-labels"]
419+
}
420+
370421
func (m *Matchers) GetMetafileMatcher() func(string) bool {
371422
return m.matcherMap["metafile"]
372423
}
@@ -397,6 +448,8 @@ func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name)
397448

398449
func isParquetChunkFile(name string) bool { return strings.HasSuffix(name, "chunks.parquet") }
399450

451+
func isParquetLabelsFile(name string) bool { return strings.HasSuffix(name, "labels.parquet") }
452+
400453
func isMetaFile(name string) bool {
401454
return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile)
402455
}

pkg/storage/tsdb/config.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -274,23 +274,24 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool {
274274

275275
// BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway.
276276
type BucketStoreConfig struct {
277-
SyncDir string `yaml:"sync_dir"`
278-
SyncInterval time.Duration `yaml:"sync_interval"`
279-
MaxConcurrent int `yaml:"max_concurrent"`
280-
MaxInflightRequests int `yaml:"max_inflight_requests"`
281-
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
282-
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
283-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
284-
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
285-
IndexCache IndexCacheConfig `yaml:"index_cache"`
286-
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
287-
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
288-
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
289-
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
290-
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
291-
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
292-
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
293-
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
277+
SyncDir string `yaml:"sync_dir"`
278+
SyncInterval time.Duration `yaml:"sync_interval"`
279+
MaxConcurrent int `yaml:"max_concurrent"`
280+
MaxInflightRequests int `yaml:"max_inflight_requests"`
281+
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
282+
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
283+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
284+
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
285+
IndexCache IndexCacheConfig `yaml:"index_cache"`
286+
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
287+
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
288+
ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache" doc:"hidden"`
289+
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
290+
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
291+
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
292+
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
293+
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
294+
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
294295

295296
// Chunk pool.
296297
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
@@ -348,6 +349,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
348349
cfg.IndexCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-cache.")
349350
cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.")
350351
cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.")
352+
cfg.ParquetLabelsCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-labels-cache.")
351353
cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.")
352354

353355
f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.")
@@ -403,6 +405,10 @@ func (cfg *BucketStoreConfig) Validate() error {
403405
if err != nil {
404406
return errors.Wrap(err, "metadata-cache configuration")
405407
}
408+
err = cfg.ParquetLabelsCache.Validate()
409+
if err != nil {
410+
return errors.Wrap(err, "parquet-labels-cache configuration")
411+
}
406412
if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) {
407413
return ErrInvalidBucketIndexBlockDiscoveryStrategy
408414
}

pkg/storage/tsdb/multilevel_bucket_cache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func newMultiLevelBucketCache(name string, cfg MultiLevelBucketCacheConfig, reg
6767
case "metadata-cache":
6868
itemName = "metadata_cache"
6969
metricHelpText = "metadata cache"
70+
case "parquet-labels-cache":
71+
itemName = "parquet_labels_cache"
72+
metricHelpText = "parquet labels cache"
7073
default:
7174
itemName = name
7275
}

pkg/storegateway/bucket_stores.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many
101101
// NewBucketStores makes a new BucketStores.
102102
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
103103
matchers := tsdb.NewMatchers()
104-
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg)
104+
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg)
105105
if err != nil {
106106
return nil, errors.Wrapf(err, "create caching bucket")
107107
}

0 commit comments

Comments
 (0)