Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/parquet-go/parquet-go v0.25.0
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73
github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0
github.com/prometheus/procfs v0.15.1
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 h1:AogORrmarkYfUOI7/lqOhz9atYmLZo69vPQ/SFkPSxE=
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0 h1:XCSo9v3if0v0G+aAO/hSUr/Ck9KJXcUPzDFt1dJnAV8=
github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI=
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=
Expand Down
41 changes: 36 additions & 5 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"flag"
"fmt"
"hash/fnv"
"math/rand"
"os"
"path/filepath"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -47,6 +49,8 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
type Config struct {
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`

DataDir string `yaml:"data_dir"`

Expand Down Expand Up @@ -78,14 +82,18 @@ type Converter struct {
blockRanges []int64

fetcherMetrics *block.FetcherMetrics

baseConverterOptions []convert.ConvertOption
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlags(f)

f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
}

func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
Expand All @@ -106,6 +114,11 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
blockRanges: blockRanges,
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
bkt: bkt,
baseConverterOptions: []convert.ConvertOption{
convert.WithSortBy(labels.MetricName),
convert.WithColDuration(time.Hour * 8),
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
},
}

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
Expand Down Expand Up @@ -163,6 +176,10 @@ func (c *Converter) running(ctx context.Context) error {
continue
}
ownedUsers := map[string]struct{}{}
rand.Shuffle(len(users), func(i, j int) {
users[i], users[j] = users[j], users[i]
})

for _, userID := range users {
if !c.limits.ParquetConverterEnabled(userID) {
continue
Expand Down Expand Up @@ -293,11 +310,20 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
return errors.Wrap(err, "error creating block fetcher")
}

blocks, _, err := fetcher.Fetch(ctx)
blks, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrapf(err, "failed to fetch blocks for user %s", userID)
}

blocks := make([]*metadata.Meta, 0, len(blks))
for _, blk := range blks {
blocks = append(blocks, blk)
}

sort.Slice(blocks, func(i, j int) bool {
return blocks[i].MinTime > blocks[j].MinTime
})

for _, b := range blocks {
ok, err := c.ownBlock(ring, b.ULID.String())
if err != nil {
Expand Down Expand Up @@ -345,22 +371,27 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
}

level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)

converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String()))

if c.cfg.FileBufferEnabled {
converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
}

_, err = convert.ConvertTSDBBlock(
ctx,
uBucket,
tsdbBlock.MinTime(),
tsdbBlock.MaxTime(),
[]convert.Convertible{tsdbBlock},
convert.WithSortBy(labels.MetricName),
convert.WithColDuration(time.Hour*8),
convert.WithName(b.ULID.String()),
convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")),
converterOpts...,
)

_ = tsdbBlock.Close()

if err != nil {
level.Error(logger).Log("msg", "Error converting block", "err", err)
continue
}

err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket)
Expand Down
65 changes: 54 additions & 11 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/parquet-go/parquet-go"
"github.com/pkg/errors"
"github.com/prometheus-community/parquet-common/schema"
"github.com/prometheus-community/parquet-common/search"
Expand All @@ -16,14 +18,17 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/multierror"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type parquetQueryableFallbackMetrics struct {
Expand Down Expand Up @@ -59,12 +64,15 @@ type parquetQueryableWithFallback struct {

// metrics
metrics *parquetQueryableFallbackMetrics

limits *validation.Overrides
logger log.Logger
}

func NewParquetQueryable(
config Config,
storageCfg cortex_tsdb.BlocksStorageConfig,
limits BlocksStoreLimits,
limits *validation.Overrides,
blockStorageQueryable *BlocksStoreQueryable,
logger log.Logger,
reg prometheus.Registerer,
Expand Down Expand Up @@ -93,18 +101,29 @@ func NewParquetQueryable(
}
userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits)

shards := make([]*parquet_storage.ParquetShard, 0, len(blocks))

for _, block := range blocks {
// we always only have 1 shard - shard 0
shard, err := parquet_storage.OpenParquetShard(ctx, userBkt, block.ID.String(), 0)
if err != nil {
return nil, err
}
shards = append(shards, shard)
shards := make([]*parquet_storage.ParquetShard, len(blocks))
errGroup := &errgroup.Group{}

for i, block := range blocks {
errGroup.Go(func() error {
// we always only have 1 shard - shard 0
shard, err := parquet_storage.OpenParquetShard(ctx,
userBkt,
block.ID.String(),
0,
parquet_storage.WithFileOptions(
parquet.SkipMagicBytes(true),
parquet.ReadBufferSize(100*1024),
parquet.SkipBloomFilters(true),
),
parquet_storage.WithOptimisticReader(true),
)
shards[i] = shard
return err
})
}

return shards, nil
return shards, errGroup.Wait()
})

p := &parquetQueryableWithFallback{
Expand All @@ -115,6 +134,8 @@ func NewParquetQueryable(
subservicesWatcher: services.NewFailureWatcher(),
finder: blockStorageQueryable.finder,
metrics: newParquetQueryableFallbackMetrics(reg),
limits: limits,
logger: logger,
}

p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
Expand Down Expand Up @@ -164,6 +185,8 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
blocksStoreQuerier: bsq,
finder: p.finder,
metrics: p.metrics,
limits: p.limits,
logger: p.logger,
}, nil
}

Expand All @@ -181,6 +204,9 @@ type parquetQuerierWithFallback struct {

// metrics
metrics *parquetQueryableFallbackMetrics

limits *validation.Overrides
logger log.Logger
}

func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
Expand Down Expand Up @@ -275,6 +301,18 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
}

func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
userID, err := tenant.TenantID(ctx)
if err != nil {
storage.ErrSeriesSet(err)
}

if q.limits.QueryVerticalShardSize(userID) > 1 {
uLogger := util_log.WithUserID(userID, q.logger)
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage")

return q.blocksStoreQuerier.Select(ctx, sortSeries, hints, matchers...)
}

mint, maxt, limit := q.minT, q.maxT, 0

if hints != nil {
Expand All @@ -288,6 +326,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool

serieSets := []storage.SeriesSet{}

// Lets sort the series to merge
if len(parquet) > 0 && len(remaining) > 0 {
sortSeries = true
}

if len(parquet) > 0 {
serieSets = append(serieSets, q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, hints, matchers...))
}
Expand Down
Loading
Loading