Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
7 changes: 5 additions & 2 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -209,8 +210,10 @@ func (c *Converter) running(ctx context.Context) error {

var ring ring.ReadRing
ring = c.ring
if c.limits.ParquetConverterTenantShardSize(userID) > 0 {
ring = c.ring.ShuffleShard(userID, c.limits.ParquetConverterTenantShardSize(userID))
shardSize := c.limits.ParquetConverterTenantShardSize(userID)
if shardSize > 0 {
dynamicShardSize := util.DynamicShardSize(c.limits.ParquetConverterTenantShardSize(userID), ring.InstancesCount())
ring = c.ring.ShuffleShard(userID, dynamicShardSize)
}

userLogger := util_log.WithUserID(userID, c.logger)
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ type Limits struct {
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`

// Parquet converter
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
ParquetConverterTenantShardSize int `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -305,7 +305,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit")
f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit")

f.IntVar(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.")
f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.")

// Store-gateway.
Expand Down Expand Up @@ -842,7 +842,7 @@ func (o *Overrides) CompactorTenantShardSize(userID string) float64 {
}

// ParquetConverterTenantShardSize returns shard size (number of converters) used by this tenant when using shuffle-sharding strategy.
func (o *Overrides) ParquetConverterTenantShardSize(userID string) int {
func (o *Overrides) ParquetConverterTenantShardSize(userID string) float64 {
return o.GetOverridesForUser(userID).ParquetConverterTenantShardSize
}

Expand Down
Loading