Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Fix chunk cache add range corruption #1006

Merged
merged 4 commits into from
Aug 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mdata/cache/ccache_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ func (mc *CCacheMetric) AddRange(prev uint32, itergens []chunk.IterGen) {
})
mc.chunks[ts] = &chunks[len(chunks)-1]
mc.keys = append(mc.keys, ts)
} else {
mc.chunks[ts].Prev = prev
}

if sortKeys {
Expand Down
77 changes: 77 additions & 0 deletions mdata/cache/ccache_metric_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package cache

import (
"errors"
"fmt"
"sort"
"testing"

"github.com/grafana/metrictank/mdata/cache/accnt"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/test"
"github.com/raintank/schema"
Expand Down Expand Up @@ -187,3 +191,76 @@ func BenchmarkAddRangeDesc64(b *testing.B) {
ccm.AddRange(0, chunks[i:i+64])
}
}

func TestCorruptionCase1(t *testing.T) {
testRun(t, func(ccm *CCacheMetric) {
chunks := generateChunks(t, 10, 6, 10)
ccm.AddRange(0, chunks[3:6])
ccm.AddRange(0, chunks[0:4])
if err := verifyCcm(ccm); err != nil {
t.Fatal(err)
}
})
}

// verifyCcm verifies the integrity of a CCacheMetric
// it assumes that all itergens are span-aware
func verifyCcm(ccm *CCacheMetric) error {
var chunk *CCacheChunk
var ok bool

if len(ccm.chunks) != len(ccm.keys) {
return errors.New("Length of ccm.chunks does not match ccm.keys")
}

if !sort.IsSorted(accnt.Uint32Asc(ccm.keys)) {
return errors.New("keys are not sorted")
}

for i, ts := range ccm.keys {
if chunk, ok = ccm.chunks[ts]; !ok {
return fmt.Errorf("Ts %d is in ccm.keys but not in ccm.chunks", ts)
}

if i == 0 {
if chunk.Prev != 0 {
return errors.New("First chunk has Prev != 0")
}
} else {
if chunk.Prev == 0 {
if ccm.chunks[ccm.keys[i-1]].Ts == chunk.Ts-chunk.Itgen.Span {
return fmt.Errorf("Chunk of ts %d has Prev == 0, but the previous chunk is present", ts)
}
} else {
if ccm.chunks[ccm.keys[i-1]].Ts != chunk.Prev {
return fmt.Errorf("Chunk of ts %d has Prev set to wrong ts %d but should be %d", ts, chunk.Prev, ccm.chunks[ccm.keys[i-1]].Ts)
}
}
}

if i == len(ccm.keys)-1 {
if chunk.Next != 0 {
return fmt.Errorf("Next of last chunk should be 0, but it's %d", chunk.Next)
}

// all checks completed
break
}

var nextChunk *CCacheChunk
if nextChunk, ok = ccm.chunks[ccm.keys[i+1]]; !ok {
return fmt.Errorf("Ts %d is in ccm.keys but not in ccm.chunks", ccm.keys[i+1])
}

if chunk.Next == 0 {
if chunk.Ts+chunk.Itgen.Span == nextChunk.Ts {
return fmt.Errorf("Next of chunk at ts %d is set to 0, but the next chunk is present", ts)
}
} else {
if chunk.Next != nextChunk.Ts {
return fmt.Errorf("Next of chunk at ts %d is set to %d, but it should be %d", ts, chunk.Next, nextChunk.Ts)
}
}
}
return nil
}