Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
7 changes: 5 additions & 2 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -209,8 +210,10 @@ func (c *Converter) running(ctx context.Context) error {

var ring ring.ReadRing
ring = c.ring
if c.limits.ParquetConverterTenantShardSize(userID) > 0 {
ring = c.ring.ShuffleShard(userID, c.limits.ParquetConverterTenantShardSize(userID))
shardSize := c.limits.ParquetConverterTenantShardSize(userID)
if shardSize > 0 {
dynamicShardSize := util.DynamicShardSize(c.limits.ParquetConverterTenantShardSize(userID), ring.InstancesCount())
ring = c.ring.ShuffleShard(userID, dynamicShardSize)
}

userLogger := util_log.WithUserID(userID, c.logger)
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ type Limits struct {
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`

// Parquet converter
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
ParquetConverterTenantShardSize int `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -305,7 +305,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit")
f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit")

f.IntVar(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.")
f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.")

// Store-gateway.
Expand Down Expand Up @@ -842,7 +842,7 @@ func (o *Overrides) CompactorTenantShardSize(userID string) float64 {
}

// ParquetConverterTenantShardSize returns shard size (number of converters) used by this tenant when using shuffle-sharding strategy.
func (o *Overrides) ParquetConverterTenantShardSize(userID string) int {
func (o *Overrides) ParquetConverterTenantShardSize(userID string) float64 {
return o.GetOverridesForUser(userID).ParquetConverterTenantShardSize
}

Expand Down
Loading