Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/parquet-go/parquet-go v0.25.0
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73
github.com/prometheus-community/parquet-common v0.0.0-20250527060536-18d3dd36c09e
github.com/prometheus/procfs v0.15.1
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 h1:AogORrmarkYfUOI7/lqOhz9atYmLZo69vPQ/SFkPSxE=
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
github.com/prometheus-community/parquet-common v0.0.0-20250527060536-18d3dd36c09e h1:paj+lHT5gwRPg+vKgL5Crr1lwmS3TdA1EePCFEVfaD4=
github.com/prometheus-community/parquet-common v0.0.0-20250527060536-18d3dd36c09e/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI=
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=
Expand Down
38 changes: 33 additions & 5 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"flag"
"fmt"
"hash/fnv"
"math/rand"
"os"
"path/filepath"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -47,6 +49,8 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
type Config struct {
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`

DataDir string `yaml:"data_dir"`

Expand Down Expand Up @@ -85,7 +89,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file_buffer_enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
}

func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
Expand Down Expand Up @@ -163,6 +169,10 @@ func (c *Converter) running(ctx context.Context) error {
continue
}
ownedUsers := map[string]struct{}{}
rand.Shuffle(len(users), func(i, j int) {
users[i], users[j] = users[j], users[i]
})

for _, userID := range users {
if !c.limits.ParquetConverterEnabled(userID) {
continue
Expand Down Expand Up @@ -293,11 +303,20 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
return errors.Wrap(err, "error creating block fetcher")
}

blocks, _, err := fetcher.Fetch(ctx)
blks, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrapf(err, "failed to fetch blocks for user %s", userID)
}

blocks := make([]*metadata.Meta, 0, len(blks))
for _, blk := range blks {
blocks = append(blocks, blk)
}

sort.Slice(blocks, func(i, j int) bool {
return blocks[i].MinTime > blocks[j].MinTime
})

for _, b := range blocks {
ok, err := c.ownBlock(ring, b.ULID.String())
if err != nil {
Expand Down Expand Up @@ -345,22 +364,31 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
}

level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
extraOpts := []convert.ConvertOption{
convert.WithSortBy(labels.MetricName),
convert.WithColDuration(time.Hour * 8),
convert.WithRowGroupSize(c.cfg.MaxRowsPerRowGroup),
convert.WithName(b.ULID.String()),
}

if c.cfg.FileBufferEnabled {
extraOpts = append(extraOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
}

_, err = convert.ConvertTSDBBlock(
ctx,
uBucket,
tsdbBlock.MinTime(),
tsdbBlock.MaxTime(),
[]convert.Convertible{tsdbBlock},
convert.WithSortBy(labels.MetricName),
convert.WithColDuration(time.Hour*8),
convert.WithName(b.ULID.String()),
convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")),
extraOpts...,
)

_ = tsdbBlock.Close()

if err != nil {
level.Error(logger).Log("msg", "Error converting block", "err", err)
continue
}

err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket)
Expand Down
56 changes: 55 additions & 1 deletion pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus-community/parquet-common/schema"
"github.com/prometheus-community/parquet-common/search"
Expand All @@ -22,8 +23,10 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/multierror"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type parquetQueryableFallbackMetrics struct {
Expand Down Expand Up @@ -59,12 +62,15 @@ type parquetQueryableWithFallback struct {

// metrics
metrics *parquetQueryableFallbackMetrics

limits *validation.Overrides
logger log.Logger
}

func NewParquetQueryable(
config Config,
storageCfg cortex_tsdb.BlocksStorageConfig,
limits BlocksStoreLimits,
limits *validation.Overrides,
blockStorageQueryable *BlocksStoreQueryable,
logger log.Logger,
reg prometheus.Registerer,
Expand Down Expand Up @@ -115,6 +121,8 @@ func NewParquetQueryable(
subservicesWatcher: services.NewFailureWatcher(),
finder: blockStorageQueryable.finder,
metrics: newParquetQueryableFallbackMetrics(reg),
limits: limits,
logger: logger,
}

p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
Expand Down Expand Up @@ -164,6 +172,8 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
blocksStoreQuerier: bsq,
finder: p.finder,
metrics: p.metrics,
limits: p.limits,
logger: p.logger,
}, nil
}

Expand All @@ -181,9 +191,24 @@ type parquetQuerierWithFallback struct {

// metrics
metrics *parquetQueryableFallbackMetrics

limits *validation.Overrides
logger log.Logger
}

func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, nil, err
}

if q.limits.QueryVerticalShardSize(userID) > 1 {
uLogger := util_log.WithUserID(userID, q.logger)
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertival sharding > 0. Falling back to the block storage")

return q.blocksStoreQuerier.LabelValues(ctx, name, hints, matchers...)
}

remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -229,6 +254,18 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
}

func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, nil, err
}

if q.limits.QueryVerticalShardSize(userID) > 1 {
uLogger := util_log.WithUserID(userID, q.logger)
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertival sharding > 0. Falling back to the block storage")

return q.blocksStoreQuerier.LabelNames(ctx, hints, matchers...)
}

remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -275,6 +312,18 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
}

func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
userID, err := tenant.TenantID(ctx)
if err != nil {
storage.ErrSeriesSet(err)
}

if q.limits.QueryVerticalShardSize(userID) > 1 {
uLogger := util_log.WithUserID(userID, q.logger)
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertival sharding > 0. Falling back to the block storage")

return q.blocksStoreQuerier.Select(ctx, sortSeries, hints, matchers...)
}

mint, maxt, limit := q.minT, q.maxT, 0

if hints != nil {
Expand All @@ -288,6 +337,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool

serieSets := []storage.SeriesSet{}

// Lets sort the series to merge
if len(parquet) > 0 && len(remaining) > 0 {
sortSeries = true
}

if len(parquet) > 0 {
serieSets = append(serieSets, q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, hints, matchers...))
}
Expand Down
Loading
Loading