Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
correct Metricdef.LastUpdate when loading from persistent index
Browse files Browse the repository at this point in the history
(instead of at previous fix, which did it at query time for Find(),
see #1532)

This has two main benefits:
1) by making the change to the data, it works equally well across all
   types of queries, in particular this fixes the behavior for TagQuery
2) we no longer over-eagerly adjust the check at query time (if MT has
   seen a new point for a given metric - e.g. if the process has been up for
   a while - than the LastUpdate value in the memory index is perfectly
   accurate, and we don't need to make any adjustment)

fix #1979
  • Loading branch information
Dieterbe committed Jun 3, 2021
1 parent 427b907 commit c03cf44
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 23 deletions.
22 changes: 15 additions & 7 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/stats"
"github.com/grafana/metrictank/util"
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -261,13 +262,6 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
}

func (b *BigtableIdx) Find(orgId uint32, pattern string, from, limit int64) ([]idx.Node, error) {
// The lastUpdate timestamp does not get updated in the bigtable index every time when
// a data point is received, there can be a delay of up to b.cfg.updateInterval32. To
// avoid falsely excluding a metric based on its lastUpdate timestamp we offset the
// from time by updateInterval32, this way we err on the "too inclusive" side
if from > int64(b.cfg.updateInterval32) {
from -= int64(b.cfg.updateInterval32)
}
return b.MemoryIndex.Find(orgId, pattern, from, limit)
}

Expand All @@ -286,6 +280,10 @@ func (b *BigtableIdx) rebuildIndex() {
}

func (b *BigtableIdx) LoadPartition(partition int32, defs []schema.MetricDefinition, now time.Time, orgFilter int) []schema.MetricDefinition {

maxLastUpdate := time.Now().Unix()
updateInterval := int64(b.cfg.updateInterval32)

ctx := context.Background()
rr := bigtable.PrefixRange(fmt.Sprintf("%d_", partition))
defsByNames := make(map[string][]schema.MetricDefinition)
Expand All @@ -300,6 +298,16 @@ func (b *BigtableIdx) LoadPartition(partition int32, defs []schema.MetricDefinit
return true
}
log.Debugf("bigtable-idx: found def %+v", def)

// because metricdefs get saved no more frequently than every updateInterval
// the lastUpdate field may be out of date by that amount (or more if the process
// struggled writing data. See updateBigtable() )
// To compensate, we set bump it here. This should make sure to include all series
// that have data for queries but didn't see an update to the index, at the cost
// of potentially including some series in queries that don't have data, but that's OK
// (that's how Graphite works anyway)
def.LastUpdate = util.MinInt64(maxLastUpdate, def.LastUpdate+updateInterval)

nameWithTags := def.NameWithTags()
defsByNames[nameWithTags] = append(defsByNames[nameWithTags], def)
return true
Expand Down
36 changes: 20 additions & 16 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,6 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
}

func (c *CasIdx) Find(orgId uint32, pattern string, from, limit int64) ([]idx.Node, error) {
// The lastUpdate timestamp does not get updated in the cassandra index every time when
// a data point is received, there can be a delay of up to c.updateInterval32. To avoid
// falsely excluding a metric based on its lastUpdate timestamp we offset the from time
// by updateInterval32, this way we err on the "too inclusive" side
if from > int64(c.updateInterval32) {
from -= int64(c.updateInterval32)
}
return c.MemoryIndex.Find(orgId, pattern, from, limit)
}

Expand Down Expand Up @@ -366,6 +359,10 @@ func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, now time
var partition int32
var lastupdate int64
var tags []string

maxLastUpdate := time.Now().Unix()
updateInterval := int64(c.updateInterval32)

for iter.Scan(&id, &orgId, &partition, &name, &interval, &unit, &mtype, &tags, &lastupdate) {
mkey, err := schema.MKeyFromString(id)
if err != nil {
Expand All @@ -377,15 +374,22 @@ func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, now time
}

mdef := &schema.MetricDefinition{
Id: mkey,
OrgId: uint32(orgId),
Partition: partition,
Name: name,
Interval: interval,
Unit: unit,
Mtype: mtype,
Tags: tags,
LastUpdate: lastupdate,
Id: mkey,
OrgId: uint32(orgId),
Partition: partition,
Name: name,
Interval: interval,
Unit: unit,
Mtype: mtype,
Tags: tags,
// because metricdefs get saved no more frequently than every updateInterval
// the lastUpdate field may be out of date by that amount (or more if the process
// struggled writing data. See updateCassandra() )
// To compensate, we set bump it here. This should make sure to include all series
// that have data for queries but didn't see an update to the index, at the cost
// of potentially including some series in queries that don't have data, but that's OK
// (that's how Graphite works anyway)
LastUpdate: util.MinInt64(maxLastUpdate, lastupdate+updateInterval),
}
nameWithTags := mdef.NameWithTags()
defsByNames[nameWithTags] = append(defsByNames[nameWithTags], mdef)
Expand Down
7 changes: 7 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ func Min(a, b uint32) uint32 {
return b
}

func MinInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}

func Max(a, b uint32) uint32 {
if a > b {
return a
Expand Down

0 comments on commit c03cf44

Please sign in to comment.