diff --git a/CHANGELOG.md b/CHANGELOG.md index 413b6cdb7c1..28905e2c7a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778 * [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780 * [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805 +* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index c52b326cb03..fa26195b9dc 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -72,6 +72,7 @@ type BlocksCleaner struct { blocksFailedTotal prometheus.Counter blocksMarkedForDeletion *prometheus.CounterVec tenantBlocks *prometheus.GaugeVec + tenantParquetBlocks *prometheus.GaugeVec tenantBlocksMarkedForDelete *prometheus.GaugeVec tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec tenantPartialBlocks *prometheus.GaugeVec @@ -154,6 +155,10 @@ func NewBlocksCleaner( Name: "cortex_bucket_blocks_count", Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.", }, commonLabels), + tenantParquetBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_bucket_parquet_blocks_count", + Help: "Total number of parquet blocks in the bucket. Blocks marked for deletion are included.", + }, commonLabels), tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_bucket_blocks_marked_for_deletion_count", Help: "Total number of blocks marked for deletion in the bucket.", @@ -354,6 +359,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro for _, userID := range c.lastOwnedUsers { if !isActive[userID] && !isMarkedForDeletion[userID] { c.tenantBlocks.DeleteLabelValues(userID) + c.tenantParquetBlocks.DeleteLabelValues(userID) c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) @@ -451,6 +457,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog // Given all blocks have been deleted, we can also remove the metrics. c.tenantBlocks.DeleteLabelValues(userID) + c.tenantParquetBlocks.DeleteLabelValues(userID) c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) @@ -602,7 +609,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us begin = time.Now() w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) - if c.cfgProvider.ParquetConverterEnabled(userID) { + parquetEnabled := c.cfgProvider.ParquetConverterEnabled(userID) + if parquetEnabled { w.EnableParquet() } @@ -676,6 +684,9 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() + if parquetEnabled { + c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks()))) + } if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { begin = time.Now() diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index befa6f9787a..81374d5ce56 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -163,7 +163,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions createBlockVisitMarker(t, bucketClient, "user-1", block11) // Partial block only has visit marker. createDeletionMark(t, bucketClient, "user-2", block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold. - // Blocks for user-3, marked for deletion. + // Blocks for user-3, tenant marked for deletion. require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3", tsdb.NewTenantDeletionMark(time.Now()))) block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil) block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil) @@ -206,6 +206,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions }, bucketClient, logger, reg) require.NoError(t, err) cfgProvider := newMockConfigProvider() + cfgProvider.parquetConverterEnabled = map[string]bool{ + "user-3": true, + "user-5": true, + "user-6": true, + } blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, @@ -333,8 +338,13 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions cortex_bucket_blocks_partials_count{user="user-2"} 0 cortex_bucket_blocks_partials_count{user="user-5"} 0 cortex_bucket_blocks_partials_count{user="user-6"} 0 + # HELP cortex_bucket_parquet_blocks_count Total number of parquet blocks in the bucket. Blocks marked for deletion are included. + # TYPE cortex_bucket_parquet_blocks_count gauge + cortex_bucket_parquet_blocks_count{user="user-5"} 0 + cortex_bucket_parquet_blocks_count{user="user-6"} 1 `), "cortex_bucket_blocks_count", + "cortex_bucket_parquet_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_blocks_marked_for_no_compaction_count", "cortex_bucket_blocks_partials_count", @@ -1013,16 +1023,21 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) { } type mockConfigProvider struct { - userRetentionPeriods map[string]time.Duration + userRetentionPeriods map[string]time.Duration + parquetConverterEnabled map[string]bool } func (m *mockConfigProvider) ParquetConverterEnabled(userID string) bool { + if result, ok := m.parquetConverterEnabled[userID]; ok { + return result + } return false } func newMockConfigProvider() *mockConfigProvider { return &mockConfigProvider{ - userRetentionPeriods: make(map[string]time.Duration), + userRetentionPeriods: make(map[string]time.Duration), + parquetConverterEnabled: make(map[string]bool), } } diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 80c44b81b0b..80c6bf7225a 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -88,6 +88,12 @@ type Converter struct { fetcherMetrics *block.FetcherMetrics baseConverterOptions []convert.ConvertOption + + metrics *metrics + + // Keep track of the last owned users. + // This is not thread safe now. + lastOwnedUsers map[string]struct{} } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -125,6 +131,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex pool: chunkenc.NewPool(), blockRanges: blockRanges, fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil), + metrics: newMetrics(registerer), bkt: bkt, baseConverterOptions: []convert.ConvertOption{ convert.WithSortBy(labels.MetricName), @@ -194,6 +201,9 @@ func (c *Converter) running(ctx context.Context) error { for _, userID := range users { if !c.limits.ParquetConverterEnabled(userID) { + // It is possible that parquet is disabled for the userID so we + // need to check if the user was owned last time. + c.cleanupMetricsForNotOwnedUser(userID) continue } @@ -211,6 +221,7 @@ func (c *Converter) running(ctx context.Context) error { continue } if !owned { + c.cleanupMetricsForNotOwnedUser(userID) continue } @@ -218,6 +229,7 @@ func (c *Converter) running(ctx context.Context) error { level.Warn(userLogger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err) continue } else if markedForDeletion { + c.metrics.deleteMetricsForTenant(userID) level.Info(userLogger).Log("msg", "skipping user because it is marked for deletion", "user", userID) continue } @@ -229,6 +241,8 @@ func (c *Converter) running(ctx context.Context) error { level.Error(userLogger).Log("msg", "failed to convert user", "err", err) } } + c.lastOwnedUsers = ownedUsers + c.metrics.ownedUsers.Set(float64(len(ownedUsers))) // Delete local files for unowned tenants, if there are any. This cleans up // leftover local files for tenants that belong to different converter now, @@ -269,8 +283,15 @@ func (c *Converter) stopping(_ error) error { } func (c *Converter) discoverUsers(ctx context.Context) ([]string, error) { - // Only active users are considered. - active, _, _, err := c.usersScanner.ScanUsers(ctx) + // Only active users are considered for conversion. + // We still check deleting and deleted users just to clean up metrics. + active, deleting, deleted, err := c.usersScanner.ScanUsers(ctx) + for _, userID := range deleting { + c.cleanupMetricsForNotOwnedUser(userID) + } + for _, userID := range deleted { + c.cleanupMetricsForNotOwnedUser(userID) + } return active, err } @@ -378,6 +399,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin } level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir) + start := time.Now() converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String())) @@ -397,14 +419,18 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin _ = tsdbBlock.Close() if err != nil { - level.Error(logger).Log("msg", "Error converting block", "err", err) + level.Error(logger).Log("msg", "Error converting block", "block", b.ULID.String(), "err", err) continue } + duration := time.Since(start) + c.metrics.convertBlockDuration.WithLabelValues(userID).Set(duration.Seconds()) + level.Info(logger).Log("msg", "successfully converted block", "block", b.ULID.String(), "duration", duration) - err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket) - if err != nil { - level.Error(logger).Log("msg", "Error writing block", "err", err) + if err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket); err != nil { + level.Error(logger).Log("msg", "Error writing block", "block", b.ULID.String(), "err", err) + continue } + c.metrics.convertedBlocks.WithLabelValues(userID).Inc() } return nil @@ -442,6 +468,12 @@ func (c *Converter) ownBlock(ring ring.ReadRing, blockId string) (bool, error) { return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil } +func (c *Converter) cleanupMetricsForNotOwnedUser(userID string) { + if _, ok := c.lastOwnedUsers[userID]; ok { + c.metrics.deleteMetricsForTenant(userID) + } +} + func (c *Converter) compactRootDir() string { return filepath.Join(c.cfg.DataDir, "compact") } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index c1795592ecd..b5efc543638 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" @@ -96,6 +97,11 @@ func TestConverter(t *testing.T) { return len(blocksConverted) }) + // Verify metrics after conversion + require.Equal(t, float64(len(blocksConverted)), testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) + require.Greater(t, testutil.ToFloat64(c.metrics.convertBlockDuration.WithLabelValues(user)), 0.0) + require.Equal(t, 1.0, testutil.ToFloat64(c.metrics.ownedUsers)) + // Verify all files are there for _, block := range blocksConverted { for _, file := range []string{ @@ -121,6 +127,20 @@ func TestConverter(t *testing.T) { test.Poll(t, time.Minute, 0, func() interface{} { return len(c.listTenantsWithMetaSyncDirectories()) }) + + // Verify metrics after user deletion + test.Poll(t, time.Minute*10, true, func() interface{} { + if testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)) != 0.0 { + return false + } + if testutil.ToFloat64(c.metrics.convertBlockDuration.WithLabelValues(user)) != 0.0 { + return false + } + if testutil.ToFloat64(c.metrics.ownedUsers) != 0.0 { + return false + } + return true + }) } func prepareConfig() Config { @@ -159,3 +179,42 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, c := newConverter(cfg, bucketClient, storageCfg, blockRanges.ToMilliseconds(), logger, registry, overrides, scanner) return c, logger, registry } + +func TestConverter_CleanupMetricsForNotOwnedUser(t *testing.T) { + // Create a new registry for testing + reg := prometheus.NewRegistry() + + // Create a new converter with test configuration + cfg := Config{} + storageCfg := cortex_tsdb.BlocksStorageConfig{} + limits := &validation.Overrides{} + converter := newConverter(cfg, nil, storageCfg, []int64{7200000}, nil, reg, limits, nil) + + // Add some test metrics for a user + userID := "test-user" + converter.metrics.convertedBlocks.WithLabelValues(userID).Inc() + converter.metrics.convertBlockDuration.WithLabelValues(userID).Set(1.0) + + // Verify metrics exist before cleanup + assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID))) + assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID))) + + // Set lastOwnedUsers to empty (user was never owned) + converter.lastOwnedUsers = map[string]struct{}{} + // Clean up metrics for the user will do nothing as the user was never owned + converter.cleanupMetricsForNotOwnedUser(userID) + assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID))) + assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID))) + + // Mark the user as previously owned + converter.lastOwnedUsers = map[string]struct{}{ + userID: {}, + } + + // Clean up metrics for the user + converter.cleanupMetricsForNotOwnedUser(userID) + + // Verify metrics are deleted + assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID))) + assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID))) +} diff --git a/pkg/parquetconverter/metrics.go b/pkg/parquetconverter/metrics.go new file mode 100644 index 00000000000..296a995de6a --- /dev/null +++ b/pkg/parquetconverter/metrics.go @@ -0,0 +1,34 @@ +package parquetconverter + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type metrics struct { + convertedBlocks *prometheus.CounterVec + convertBlockDuration *prometheus.GaugeVec + ownedUsers prometheus.Gauge +} + +func newMetrics(reg prometheus.Registerer) *metrics { + return &metrics{ + convertedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_converter_converted_blocks_total", + Help: "Total number of converted blocks per user.", + }, []string{"user"}), + convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_parquet_converter_convert_block_duration_seconds", + Help: "Time taken to for the latest block conversion for the user.", + }, []string{"user"}), + ownedUsers: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_parquet_converter_users_owned", + Help: "Number of users that the parquet converter owns.", + }), + } +} + +func (m *metrics) deleteMetricsForTenant(userID string) { + m.convertedBlocks.DeleteLabelValues(userID) + m.convertBlockDuration.DeleteLabelValues(userID) +} diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 6a32b0e567b..f0075493ab3 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -70,6 +70,16 @@ func (idx *Index) IsEmpty() bool { return len(idx.Blocks) == 0 && len(idx.BlockDeletionMarks) == 0 } +func (idx *Index) ParquetBlocks() []*Block { + blocks := make([]*Block, 0, len(idx.Blocks)) + for _, b := range idx.Blocks { + if b.Parquet != nil { + blocks = append(blocks, b) + } + } + return blocks +} + // Block holds the information about a block in the index. type Block struct { // Block ID. diff --git a/pkg/storage/tsdb/bucketindex/index_test.go b/pkg/storage/tsdb/bucketindex/index_test.go index efa068619e7..c2beb59c32e 100644 --- a/pkg/storage/tsdb/bucketindex/index_test.go +++ b/pkg/storage/tsdb/bucketindex/index_test.go @@ -7,6 +7,8 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/storage/parquet" ) func TestIndex_RemoveBlock(t *testing.T) { @@ -407,3 +409,63 @@ func TestBlockDeletionMarks_Clone(t *testing.T) { orig[0].DeletionTime = -1 assert.Equal(t, int64(1), clone[0].DeletionTime) } + +func TestIndex_ParquetBlocks(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + block3 := ulid.MustNew(3, nil) + + tests := map[string]struct { + index *Index + expected []*Block + }{ + "empty index": { + index: &Index{}, + expected: []*Block{}, + }, + "no parquet blocks": { + index: &Index{ + Blocks: Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + }, + }, + expected: []*Block{}, + }, + "some parquet blocks": { + index: &Index{ + Blocks: Blocks{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + {ID: block2}, + {ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, + }, + expected: []*Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + {ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, + }, + "all parquet blocks": { + index: &Index{ + Blocks: Blocks{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + {ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, + }, + expected: []*Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + {ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + actual := testData.index.ParquetBlocks() + assert.Equal(t, testData.expected, actual) + }) + } +}