Skip to content

Commit

Permalink
Merge pull request #424 from metrico/fix_421
Browse files Browse the repository at this point in the history
fix: gaps in the data #421
  • Loading branch information
akvlad authored Jan 9, 2024
2 parents 1bcbd62 + 1b61a80 commit 34813e9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 15 deletions.
76 changes: 61 additions & 15 deletions wasm_parts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func stats() {
//export pqlRangeQuery
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id}
queriable := &TestQueryable{id: id, stepMs: int64(stepMS)}
return getEng().NewRangeQuery(
queriable,
nil,
Expand All @@ -159,7 +159,7 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint
//export pqlInstantQuery
func pqlInstantQuery(id uint32, timeMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id}
queriable := &TestQueryable{id: id, stepMs: 15000}
return getEng().NewInstantQuery(
queriable,
nil,
Expand All @@ -170,7 +170,7 @@ func pqlInstantQuery(id uint32, timeMS float64) uint32 {

//export pqlSeries
func pqlSeries(id uint32) uint32 {
queriable := &TestQueryable{id: id}
queriable := &TestQueryable{id: id, stepMs: 15000}
query, err := getEng().NewRangeQuery(
queriable,
nil,
Expand Down Expand Up @@ -329,7 +329,8 @@ func (t TestLogger) Log(keyvals ...interface{}) error {
}

type TestQueryable struct {
id uint32
id uint32
stepMs int64
}

func (t TestQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
Expand All @@ -338,11 +339,12 @@ func (t TestQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Q
for r.i < uint32(len(data[t.id].request)) {
sets[r.ReadString()] = r.ReadByteArray()
}
return &TestQuerier{sets: sets}, nil
return &TestQuerier{sets: sets, stepMs: t.stepMs}, nil
}

type TestQuerier struct {
sets map[string][]byte
sets map[string][]byte
stepMs int64
}

func (t TestQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
Expand All @@ -362,12 +364,14 @@ func (t TestQuerier) Select(sortSeries bool, hints *storage.SelectHints, matcher
return &TestSeriesSet{
data: t.sets[strMatchers],
reader: BinaryReader{buffer: t.sets[strMatchers]},
stepMs: t.stepMs,
}
}

type TestSeriesSet struct {
data []byte
reader BinaryReader
stepMs int64
}

func (t *TestSeriesSet) Next() bool {
Expand All @@ -376,10 +380,12 @@ func (t *TestSeriesSet) Next() bool {

func (t *TestSeriesSet) At() storage.Series {
res := &TestSeries{
i: -1,
i: 0,
stepMs: t.stepMs,
}
res.labels = t.reader.ReadLabelsTuple()
res.data = t.reader.ReadPointsArrayRaw()
res.reset()
return res
}

Expand All @@ -392,31 +398,71 @@ func (t *TestSeriesSet) Warnings() storage.Warnings {
}

type TestSeries struct {
data []byte
data []byte
stepMs int64

labels labels.Labels
i int
labels labels.Labels
tsMs int64
generatedMs int64
val float64
i int
}

func (t *TestSeries) reset() {
if len(t.data) == 0 {
return
}
t.tsMs = *(*int64)(unsafe.Pointer(&t.data[0]))
t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
}

func (t *TestSeries) Next() bool {
if t.i*16 >= len(t.data) {
if t.generatedMs == 0 {
t.tsMs += t.stepMs
t.generatedMs += t.stepMs
return true
}
return false
}
ts := *(*int64)(unsafe.Pointer(&t.data[t.i*16]))
if ts-t.tsMs > t.stepMs && t.generatedMs < 300000 {
t.tsMs += t.stepMs
t.generatedMs += t.stepMs
return true
}
t.tsMs = ts
t.generatedMs = 0
t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
t.i++
return t.i*16 < len(t.data)
return true
}

func (t *TestSeries) Seek(tmMS int64) bool {
for t.i = 0; t.i*16 < len(t.data); t.i++ {
ms := *(*int64)(unsafe.Pointer(&t.data[t.i*16]))
if ms >= tmMS {
if ms == tmMS {
t.tsMs = ms
t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
t.i++
return true
}
if ms > tmMS {
t.i--
if t.i < 0 {
t.i = 0
}
t.tsMs = tmMS
t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
t.i++
return true
}
}
return false
}

func (t *TestSeries) At() (int64, float64) {
ts := *(*int64)(unsafe.Pointer(&t.data[t.i*16]))
val := *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
return ts, val
return t.tsMs, t.val
}

func (t *TestSeries) Err() error {
Expand Down
Binary file modified wasm_parts/main.wasm.gz
Binary file not shown.

0 comments on commit 34813e9

Please sign in to comment.