Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* `-alertmanager.cluster.peers` instead of `-cluster.peer`
* `-alertmanager.cluster.peer-timeout` instead of `-cluster.peer-timeout`
* [FEATURE] Ruler: added `local` backend support to the ruler storage configuration under the `-ruler-storage.` flag prefix. #3932
* [ENHANCEMENT] Blocks storage: reduce ingester memory by eliminating series reference cache. #3951
* [ENHANCEMENT] Ruler: optimized `<prefix>/api/v1/rules` and `<prefix>/api/v1/alerts` when ruler sharding is enabled. #3916
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
* `cortex_ruler_clients`
Expand Down
33 changes: 24 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/armon/go-metrics v0.3.6
github.com/aws/aws-sdk-go v1.37.8
github.com/aws/aws-sdk-go v1.38.3
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/dustin/go-humanize v1.0.0
Expand Down Expand Up @@ -44,10 +44,10 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/alertmanager v0.21.1-0.20210310093010-0f9cab6991e6
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.18.0
github.com/prometheus/prometheus v1.8.2-0.20210321183757-31a518faab18
github.com/prometheus/common v0.20.0
github.com/prometheus/prometheus v1.8.2-0.20210324152458-c7a62b95cea0
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
Expand All @@ -60,11 +60,11 @@ require (
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0.0.20210225194612-fa82d11a958a
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0.0.20210225194612-fa82d11a958a
go.uber.org/atomic v1.7.0
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
google.golang.org/api v0.39.0
google.golang.org/grpc v1.34.0
golang.org/x/net v0.0.0-20210324051636-2c4c8ecb7826
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/api v0.42.0
google.golang.org/grpc v1.36.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
sigs.k8s.io/yaml v1.2.0
Expand Down Expand Up @@ -101,3 +101,18 @@ replace github.com/go-openapi/strfmt => github.com/go-openapi/strfmt v0.19.5
replace github.com/go-openapi/swag => github.com/go-openapi/swag v0.19.9

replace github.com/go-openapi/validate => github.com/go-openapi/validate v0.19.8

// Pin these, which are updated as dependencies in Prometheus; we will take those updates separately and carefully
replace (
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.37.8
github.com/google/pprof => github.com/google/pprof v0.0.0-20210208152844-1612e9be7af6
github.com/miekg/dns => github.com/miekg/dns v1.1.38
github.com/prometheus/client_golang => github.com/prometheus/client_golang v1.9.0
golang.org/x/crypto => golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9
golang.org/x/net => golang.org/x/net v0.0.0-20210119194325-5f4716e94777
golang.org/x/oauth2 => golang.org/x/oauth2 v0.0.0-20210210192628-66670185b0cd
golang.org/x/sync => golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys => golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
google.golang.org/api => google.golang.org/api v0.39.0
google.golang.org/grpc => google.golang.org/grpc v1.34.0
)
369 changes: 20 additions & 349 deletions go.sum

Large diffs are not rendered by default.

68 changes: 18 additions & 50 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ const (
type userTSDB struct {
db *tsdb.DB
userID string
refCache *cortex_tsdb.RefCache
activeSeries *ActiveSeries
seriesInMetric *metricCounter
limiter *Limiter
Expand Down Expand Up @@ -185,7 +184,8 @@ func (u *userTSDB) compactHead(blockDuration int64) error {

defer u.casState(forceCompacting, active)

// Ingestion of samples in parallel with forced compaction can lead to overlapping blocks.
// Ingestion of samples in parallel with forced compaction can lead to overlapping blocks,
// and possible invalidation of the references returned from Appender.GetRef().
// So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over.
u.pushesInFlight.Wait()

Expand Down Expand Up @@ -383,7 +383,6 @@ type TSDBState struct {
walReplayTime prometheus.Histogram
appenderAddDuration prometheus.Histogram
appenderCommitDuration prometheus.Histogram
refCachePurgeDuration prometheus.Histogram
idleTsdbChecks *prometheus.CounterVec
}

Expand Down Expand Up @@ -435,11 +434,6 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
Help: "The total time it takes for a push request to commit samples appended to TSDB.",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}),
refCachePurgeDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_tsdb_refcache_purge_duration_seconds",
Help: "The total time it takes to purge the TSDB series reference cache for a single tenant.",
Buckets: prometheus.DefBuckets,
}),

idleTsdbChecks: idleTsdbChecks,
}
Expand Down Expand Up @@ -619,11 +613,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod)
defer rateUpdateTicker.Stop()

// We use an hardcoded value for this ticker because there should be no
// real value in customizing it.
refCachePurgeTicker := time.NewTicker(5 * time.Minute)
defer refCachePurgeTicker.Stop()

var activeSeriesTickerChan <-chan time.Time
if i.cfg.ActiveSeriesMetricsEnabled {
t := time.NewTicker(i.cfg.ActiveSeriesMetricsUpdatePeriod)
Expand All @@ -646,17 +635,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
db.ingestedRuleSamples.tick()
}
i.userStatesMtx.RUnlock()
case <-refCachePurgeTicker.C:
for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
continue
}

startTime := time.Now()
userDB.refCache.Purge(startTime.Add(-cortex_tsdb.DefaultRefCacheTTL))
i.TSDBState.refCachePurgeDuration.Observe(time.Since(startTime).Seconds())
}

case <-activeSeriesTickerChan:
i.v2UpdateActiveSeries()
Expand All @@ -683,6 +661,12 @@ func (i *Ingester) v2UpdateActiveSeries() {
}
}

// GetRef() is an extra method added to TSDB to let Cortex check before calling Add()
type extendedAppender interface {
storage.Appender
storage.GetRef
}

// v2Push adds metrics to a block
func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
var firstPartialErr error
Expand Down Expand Up @@ -738,13 +722,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
)

// Walk the samples, appending them to the users database
app := db.Appender(ctx)
app := db.Appender(ctx).(extendedAppender)
for _, ts := range req.Timeseries {
// Check if we already have a cached reference for this series. Be aware
// that even if we have a reference it's not guaranteed to be still valid.
// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
cachedRef, copiedLabels, cachedRefExists := db.refCache.Ref(startAppend, cortexpb.FromLabelAdaptersToLabels(ts.Labels))

// Look up a reference for this series.
ref, copiedLabels := app.GetRef(cortexpb.FromLabelAdaptersToLabels(ts.Labels))

// To find out if any sample was added to this series, we keep old value.
oldSucceededSamplesCount := succeededSamplesCount
Expand All @@ -753,30 +737,18 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
var err error

// If the cached reference exists, we try to use it.
if cachedRefExists {
var ref uint64
if ref, err = app.Append(cachedRef, copiedLabels, s.TimestampMs, s.Value); err == nil {
if ref != 0 {
if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil {
succeededSamplesCount++
// This means the reference changes which means we need to update our cache.
if ref != cachedRef {
db.refCache.SetRef(startAppend, copiedLabels, ref)
}
continue
}

} else {
var ref uint64

// Copy the label set because both TSDB and the cache may retain it.
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)

// Retain the reference in case there are multiple samples for the series.
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
db.refCache.SetRef(startAppend, copiedLabels, ref)

// Set these in case there are multiple samples for the series.
cachedRef = ref
cachedRefExists = true

succeededSamplesCount++
continue
}
Expand Down Expand Up @@ -827,11 +799,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor

if i.cfg.ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
db.activeSeries.UpdateSeries(cortexpb.FromLabelAdaptersToLabels(ts.Labels), startAppend, func(l labels.Labels) labels.Labels {
// If we have already made a copy during this push, no need to create new one.
if copiedLabels != nil {
return copiedLabels
}
return cortexpb.CopyLabels(l)
// we must already have copied the labels if succeededSamplesCount has been incremented.
return copiedLabels
})
}
}
Expand Down Expand Up @@ -1435,7 +1404,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {

userDB := &userTSDB{
userID: userID,
refCache: cortex_tsdb.NewRefCache(),
activeSeries: NewActiveSeries(),
seriesInMetric: newMetricCounter(i.limiter),
ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
Expand Down
Loading