From 35ac2afdf9516e1389ac49f9763ecf3d764e0ab5 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 13 Jun 2025 14:13:05 -0700 Subject: [PATCH 1/2] add dynamic shard size for parquet converter Signed-off-by: yeya24 --- pkg/parquetconverter/converter.go | 7 +++++-- pkg/util/validation/limits.go | 8 ++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 44901e0c6d4..236cbb203f3 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/cortexproject/cortex/pkg/util" "hash/fnv" "math/rand" "os" @@ -209,8 +210,10 @@ func (c *Converter) running(ctx context.Context) error { var ring ring.ReadRing ring = c.ring - if c.limits.ParquetConverterTenantShardSize(userID) > 0 { - ring = c.ring.ShuffleShard(userID, c.limits.ParquetConverterTenantShardSize(userID)) + shardSize := c.limits.ParquetConverterTenantShardSize(userID) + if shardSize > 0 { + dynamicShardSize := util.DynamicShardSize(c.limits.ParquetConverterTenantShardSize(userID), ring.InstancesCount()) + ring = c.ring.ShuffleShard(userID, dynamicShardSize) } userLogger := util_log.WithUserID(userID, c.logger) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 9cc7d156e54..b31a449b23d 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -207,8 +207,8 @@ type Limits struct { CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"` // Parquet converter - ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"` - ParquetConverterTenantShardSize int `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"` + ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"` + ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. @@ -305,7 +305,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit") f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit") - f.IntVar(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") + f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") // Store-gateway. @@ -842,7 +842,7 @@ func (o *Overrides) CompactorTenantShardSize(userID string) float64 { } // ParquetConverterTenantShardSize returns shard size (number of converters) used by this tenant when using shuffle-sharding strategy. -func (o *Overrides) ParquetConverterTenantShardSize(userID string) int { +func (o *Overrides) ParquetConverterTenantShardSize(userID string) float64 { return o.GetOverridesForUser(userID).ParquetConverterTenantShardSize } From e76553ffcc31a1435012d6a6252e9d57156a9d05 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 13 Jun 2025 14:19:50 -0700 Subject: [PATCH 2/2] update changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + pkg/parquetconverter/converter.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28905e2c7a7..25f963fece7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ * [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780 * [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805 * [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 +* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 236cbb203f3..3bcc059c298 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "github.com/cortexproject/cortex/pkg/util" "hash/fnv" "math/rand" "os" @@ -35,6 +34,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation"