Skip to content

Commit

Permalink
[release 2.53] Revert 13583 to stop dropping samples in remote-write …
Browse files Browse the repository at this point in the history
…catch-up (#14446)

* Revert "fix bug that would cause us to endlessly fall behind (#13583)"
This reverts commit 0c71230.
(leaving the new test in place)

* TSDB: enhance TestRun_AvoidNotifyWhenBehind
With code suggested by @cstyan in #14439.

* WAL watcher: add back log line showing current segment

---------

Signed-off-by: Bryan Boreham <[email protected]>
  • Loading branch information
bboreham committed Jul 10, 2024
1 parent 4c35b92 commit 7083ae8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 66 deletions.
21 changes: 10 additions & 11 deletions tsdb/wlog/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ func (w *Watcher) loop() {
// Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit.
func (w *Watcher) Run() error {
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}

// We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL.
w.sendSamples = false
Expand All @@ -286,20 +291,14 @@ func (w *Watcher) Run() error {
return err
}

level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment)
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
for !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment))

// Re-check on each iteration in case a new segment was added,
// because watch() will wait for notifications on the last segment.
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
tail := currentSegment >= lastSegment

level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment, "lastSegment", lastSegment)
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
return err
}

Expand Down
110 changes: 55 additions & 55 deletions tsdb/wlog/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math/rand"
"os"
"path"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -698,11 +699,46 @@ func TestRun_StartupTime(t *testing.T) {
}
}

func generateWALRecords(w *WL, segment, seriesCount, samplesCount int) error {
enc := record.Encoder{}
for j := 0; j < seriesCount; j++ {
ref := j + (segment * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", segment)),
},
}, nil)
if err := w.Log(series); err != nil {
return err
}

for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(segment),
V: float64(segment),
},
}, nil)
if err := w.Log(sample); err != nil {
return err
}
}
}
return nil
}

func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const pageSize = 32 * 1024
const segments = 10
const seriesCount = 20
const samplesCount = 300
if runtime.GOOS == "windows" { // Takes a really long time, perhaps because min sleep time is 15ms.
t.SkipNow()
}
const segmentSize = pageSize // Smallest allowed segment size.
const segmentsToWrite = 5
const segmentsToRead = segmentsToWrite - 1
const seriesCount = 10
const samplesCount = 50

// This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time.Second
Expand All @@ -715,73 +751,37 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
err := os.Mkdir(wdir, 0o777)
require.NoError(t, err)

enc := record.Encoder{}
w, err := NewSize(nil, nil, wdir, pageSize, compress)
w, err := NewSize(nil, nil, wdir, segmentSize, compress)
require.NoError(t, err)
var wg sync.WaitGroup
// add one segment initially to ensure there's a value > 0 for the last segment id
for i := 0; i < 1; i++ {
for j := 0; j < seriesCount; j++ {
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))

for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
}
// Generate one segment initially to ensure that watcher.Run() finds at least one segment on disk.
require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
w.NextSegment() // Force creation of the next segment
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i < segments; i++ {
for j := 0; j < seriesCount; j++ {
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))

for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
for i := 1; i < segmentsToWrite; i++ {
require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount))
w.NextSegment()
}
}()

wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = segments
watcher.MaxSegment = segmentsToRead

watcher.setMetrics()
startTime := time.Now()
err = watcher.Run()
wg.Wait()
require.Less(t, time.Since(startTime), readTimeout)

// But samples records shouldn't get dropped
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() > 0
})
require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)

require.NoError(t, err)
require.NoError(t, w.Close())
})
Expand Down

0 comments on commit 7083ae8

Please sign in to comment.