Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
13 changes: 12 additions & 1 deletion pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type BlocksCleaner struct {
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion *prometheus.CounterVec
tenantBlocks *prometheus.GaugeVec
tenantParquetBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
Expand Down Expand Up @@ -154,6 +155,10 @@ func NewBlocksCleaner(
Name: "cortex_bucket_blocks_count",
Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.",
}, commonLabels),
tenantParquetBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_parquet_blocks_count",
Help: "Total number of parquet blocks in the bucket. Blocks marked for deletion are included.",
}, commonLabels),
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_marked_for_deletion_count",
Help: "Total number of blocks marked for deletion in the bucket.",
Expand Down Expand Up @@ -354,6 +359,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
for _, userID := range c.lastOwnedUsers {
if !isActive[userID] && !isMarkedForDeletion[userID] {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantParquetBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
Expand Down Expand Up @@ -451,6 +457,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog

// Given all blocks have been deleted, we can also remove the metrics.
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantParquetBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
Expand Down Expand Up @@ -602,7 +609,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
begin = time.Now()
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)

if c.cfgProvider.ParquetConverterEnabled(userID) {
parquetEnabled := c.cfgProvider.ParquetConverterEnabled(userID)
if parquetEnabled {
w.EnableParquet()
}

Expand Down Expand Up @@ -676,6 +684,9 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
if parquetEnabled {
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
}

if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
begin = time.Now()
Expand Down
21 changes: 18 additions & 3 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
createBlockVisitMarker(t, bucketClient, "user-1", block11) // Partial block only has visit marker.
createDeletionMark(t, bucketClient, "user-2", block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.

// Blocks for user-3, marked for deletion.
// Blocks for user-3, tenant marked for deletion.
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3", tsdb.NewTenantDeletionMark(time.Now())))
block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil)
block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil)
Expand Down Expand Up @@ -206,6 +206,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
cfgProvider.parquetConverterEnabled = map[string]bool{
"user-3": true,
"user-5": true,
"user-6": true,
}
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
Expand Down Expand Up @@ -333,8 +338,13 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
cortex_bucket_blocks_partials_count{user="user-2"} 0
cortex_bucket_blocks_partials_count{user="user-5"} 0
cortex_bucket_blocks_partials_count{user="user-6"} 0
# HELP cortex_bucket_parquet_blocks_count Total number of parquet blocks in the bucket. Blocks marked for deletion are included.
# TYPE cortex_bucket_parquet_blocks_count gauge
cortex_bucket_parquet_blocks_count{user="user-5"} 0
cortex_bucket_parquet_blocks_count{user="user-6"} 1
`),
"cortex_bucket_blocks_count",
"cortex_bucket_parquet_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_marked_for_no_compaction_count",
"cortex_bucket_blocks_partials_count",
Expand Down Expand Up @@ -1013,16 +1023,21 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
userRetentionPeriods map[string]time.Duration
parquetConverterEnabled map[string]bool
}

func (m *mockConfigProvider) ParquetConverterEnabled(userID string) bool {
if result, ok := m.parquetConverterEnabled[userID]; ok {
return result
}
return false
}

func newMockConfigProvider() *mockConfigProvider {
return &mockConfigProvider{
userRetentionPeriods: make(map[string]time.Duration),
userRetentionPeriods: make(map[string]time.Duration),
parquetConverterEnabled: make(map[string]bool),
}
}

Expand Down
44 changes: 38 additions & 6 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ type Converter struct {
fetcherMetrics *block.FetcherMetrics

baseConverterOptions []convert.ConvertOption

metrics *metrics

// Keep track of the last owned users.
// This is not thread safe now.
lastOwnedUsers map[string]struct{}
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -125,6 +131,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
pool: chunkenc.NewPool(),
blockRanges: blockRanges,
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
metrics: newMetrics(registerer),
bkt: bkt,
baseConverterOptions: []convert.ConvertOption{
convert.WithSortBy(labels.MetricName),
Expand Down Expand Up @@ -194,6 +201,9 @@ func (c *Converter) running(ctx context.Context) error {

for _, userID := range users {
if !c.limits.ParquetConverterEnabled(userID) {
// It is possible that parquet is disabled for the userID so we
// need to check if the user was owned last time.
c.cleanupMetricsForNotOwnedUser(userID)
continue
}

Expand All @@ -211,13 +221,15 @@ func (c *Converter) running(ctx context.Context) error {
continue
}
if !owned {
c.cleanupMetricsForNotOwnedUser(userID)
continue
}

if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bkt, userID); err != nil {
level.Warn(userLogger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err)
continue
} else if markedForDeletion {
c.metrics.deleteMetricsForTenant(userID)
level.Info(userLogger).Log("msg", "skipping user because it is marked for deletion", "user", userID)
continue
}
Expand All @@ -229,6 +241,8 @@ func (c *Converter) running(ctx context.Context) error {
level.Error(userLogger).Log("msg", "failed to convert user", "err", err)
}
}
c.lastOwnedUsers = ownedUsers
c.metrics.ownedUsers.Set(float64(len(ownedUsers)))

// Delete local files for unowned tenants, if there are any. This cleans up
// leftover local files for tenants that belong to different converter now,
Expand Down Expand Up @@ -269,8 +283,15 @@ func (c *Converter) stopping(_ error) error {
}

func (c *Converter) discoverUsers(ctx context.Context) ([]string, error) {
// Only active users are considered.
active, _, _, err := c.usersScanner.ScanUsers(ctx)
// Only active users are considered for conversion.
// We still check deleting and deleted users just to clean up metrics.
active, deleting, deleted, err := c.usersScanner.ScanUsers(ctx)
for _, userID := range deleting {
c.cleanupMetricsForNotOwnedUser(userID)
}
for _, userID := range deleted {
c.cleanupMetricsForNotOwnedUser(userID)
}
return active, err
}

Expand Down Expand Up @@ -378,6 +399,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
}

level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
start := time.Now()

converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String()))

Expand All @@ -397,14 +419,18 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
_ = tsdbBlock.Close()

if err != nil {
level.Error(logger).Log("msg", "Error converting block", "err", err)
level.Error(logger).Log("msg", "Error converting block", "block", b.ULID.String(), "err", err)
continue
}
duration := time.Since(start)
c.metrics.convertBlockDuration.WithLabelValues(userID).Set(duration.Seconds())
level.Info(logger).Log("msg", "successfully converted block", "block", b.ULID.String(), "duration", duration)

err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket)
if err != nil {
level.Error(logger).Log("msg", "Error writing block", "err", err)
if err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket); err != nil {
level.Error(logger).Log("msg", "Error writing block", "block", b.ULID.String(), "err", err)
continue
}
c.metrics.convertedBlocks.WithLabelValues(userID).Inc()
}

return nil
Expand Down Expand Up @@ -442,6 +468,12 @@ func (c *Converter) ownBlock(ring ring.ReadRing, blockId string) (bool, error) {
return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil
}

func (c *Converter) cleanupMetricsForNotOwnedUser(userID string) {
if _, ok := c.lastOwnedUsers[userID]; ok {
c.metrics.deleteMetricsForTenant(userID)
}
}

func (c *Converter) compactRootDir() string {
return filepath.Join(c.cfg.DataDir, "compact")
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,6 +97,11 @@ func TestConverter(t *testing.T) {
return len(blocksConverted)
})

// Verify metrics after conversion
require.Equal(t, float64(len(blocksConverted)), testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))
require.Greater(t, testutil.ToFloat64(c.metrics.convertBlockDuration.WithLabelValues(user)), 0.0)
require.Equal(t, 1.0, testutil.ToFloat64(c.metrics.ownedUsers))

// Verify all files are there
for _, block := range blocksConverted {
for _, file := range []string{
Expand All @@ -121,6 +127,20 @@ func TestConverter(t *testing.T) {
test.Poll(t, time.Minute, 0, func() interface{} {
return len(c.listTenantsWithMetaSyncDirectories())
})

// Verify metrics after user deletion
test.Poll(t, time.Minute*10, true, func() interface{} {
if testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)) != 0.0 {
return false
}
if testutil.ToFloat64(c.metrics.convertBlockDuration.WithLabelValues(user)) != 0.0 {
return false
}
if testutil.ToFloat64(c.metrics.ownedUsers) != 0.0 {
return false
}
return true
})
}

func prepareConfig() Config {
Expand Down Expand Up @@ -159,3 +179,42 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket,
c := newConverter(cfg, bucketClient, storageCfg, blockRanges.ToMilliseconds(), logger, registry, overrides, scanner)
return c, logger, registry
}

func TestConverter_CleanupMetricsForNotOwnedUser(t *testing.T) {
// Create a new registry for testing
reg := prometheus.NewRegistry()

// Create a new converter with test configuration
cfg := Config{}
storageCfg := cortex_tsdb.BlocksStorageConfig{}
limits := &validation.Overrides{}
converter := newConverter(cfg, nil, storageCfg, []int64{7200000}, nil, reg, limits, nil)

// Add some test metrics for a user
userID := "test-user"
converter.metrics.convertedBlocks.WithLabelValues(userID).Inc()
converter.metrics.convertBlockDuration.WithLabelValues(userID).Set(1.0)

// Verify metrics exist before cleanup
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID)))
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID)))

// Set lastOwnedUsers to empty (user was never owned)
converter.lastOwnedUsers = map[string]struct{}{}
// Clean up metrics for the user will do nothing as the user was never owned
converter.cleanupMetricsForNotOwnedUser(userID)
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID)))
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID)))

// Mark the user as previously owned
converter.lastOwnedUsers = map[string]struct{}{
userID: {},
}

// Clean up metrics for the user
converter.cleanupMetricsForNotOwnedUser(userID)

// Verify metrics are deleted
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID)))
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID)))
}
34 changes: 34 additions & 0 deletions pkg/parquetconverter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package parquetconverter

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type metrics struct {
convertedBlocks *prometheus.CounterVec
convertBlockDuration *prometheus.GaugeVec
ownedUsers prometheus.Gauge
}

func newMetrics(reg prometheus.Registerer) *metrics {
return &metrics{
convertedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_converter_converted_blocks_total",
Help: "Total number of converted blocks per user.",
}, []string{"user"}),
convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_parquet_converter_convert_block_duration_seconds",
Help: "Time taken to for the latest block conversion for the user.",
}, []string{"user"}),
ownedUsers: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_parquet_converter_users_owned",
Help: "Number of users that the parquet converter owns.",
}),
}
}

func (m *metrics) deleteMetricsForTenant(userID string) {
m.convertedBlocks.DeleteLabelValues(userID)
m.convertBlockDuration.DeleteLabelValues(userID)
}
10 changes: 10 additions & 0 deletions pkg/storage/tsdb/bucketindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ func (idx *Index) IsEmpty() bool {
return len(idx.Blocks) == 0 && len(idx.BlockDeletionMarks) == 0
}

func (idx *Index) ParquetBlocks() []*Block {
blocks := make([]*Block, 0, len(idx.Blocks))
for _, b := range idx.Blocks {
if b.Parquet != nil {
blocks = append(blocks, b)
}
}
return blocks
}

// Block holds the information about a block in the index.
type Block struct {
// Block ID.
Expand Down
Loading
Loading