Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/prometheus/query_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,19 @@ func (p *queryLogTest) run(t *testing.T) {
p.setQueryLog(t, "")
}

params := append([]string{"-test.main", "--config.file=" + p.configFile.Name(), "--web.enable-lifecycle", fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port)}, p.params()...)
dir, err := ioutil.TempDir("", "query_log_test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

params := append([]string{
"-test.main",
"--config.file=" + p.configFile.Name(),
"--web.enable-lifecycle",
fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port),
"--storage.tsdb.path=" + dir,
}, p.params()...)

prom := exec.Command(promPath, params...)

Expand Down
14 changes: 10 additions & 4 deletions tsdb/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/prometheus/prometheus/util/testutil"
)

Expand Down Expand Up @@ -303,7 +304,13 @@ func TestReadIndexFormatV1(t *testing.T) {

// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
return createBlockFromHead(tb, dir, createHead(tb, series))
w, close := newTestNoopWal(tb)
defer close()
head := createHead(tb, series, w)
defer func() {
testutil.Ok(tb, head.Close())
}()
return createBlockFromHead(tb, dir, head)
}

func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
Expand All @@ -319,10 +326,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
return filepath.Join(dir, ulid.String())
}

func createHead(tb testing.TB, series []Series) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
func createHead(tb testing.TB, series []Series, w wal.WAL) *Head {
head, err := NewHead(nil, nil, w, 2*60*60*1000, nil)
testutil.Ok(tb, err)
defer head.Close()

app := head.Appender()
for _, s := range series {
Expand Down
1 change: 0 additions & 1 deletion tsdb/chunkenc/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func testChunk(c Chunk) error {

app.Append(ts, v)
exp = append(exp, pair{t: ts, v: v})
// fmt.Println("appended", len(c.Bytes()), c.Bytes())
}

it := c.Iterator(nil)
Expand Down
62 changes: 37 additions & 25 deletions tsdb/chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,40 +190,50 @@ func (w *Writer) cut() error {
return err
}

p, _, err := nextSequenceFile(w.dirFile.Name())
n, f, _, err := cutSegmentFile(w.dirFile, chunksFormatV1, true, w.segmentSize)
if err != nil {
return err
}
w.n = int64(n)

w.files = append(w.files, f)
if w.wbuf != nil {
w.wbuf.Reset(f)
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
}

return nil
}

func cutSegmentFile(dirFile *os.File, chunksFormat byte, preallocate bool, segmentSize int64) (headerSize int, newFile *os.File, seq int, err error) {
p, seq, err := nextSequenceFile(dirFile.Name())
if err != nil {
return 0, nil, 0, err
}
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
return 0, nil, 0, err
}
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
return err
if preallocate {
if err = fileutil.Preallocate(f, segmentSize, true); err != nil {
return 0, nil, 0, err
}
}
if err = w.dirFile.Sync(); err != nil {
return err
if err = dirFile.Sync(); err != nil {
return 0, nil, 0, err
}

// Write header metadata for new file.
metab := make([]byte, SegmentHeaderSize)
binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
metab[4] = chunksFormatV1
metab[4] = chunksFormat

n, err := f.Write(metab)
if err != nil {
return err
return 0, nil, 0, err
}
w.n = int64(n)

w.files = append(w.files, f)
if w.wbuf != nil {
w.wbuf.Reset(f)
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
}

return nil
return n, f, seq, nil
}

func (w *Writer) write(b []byte) error {
Expand Down Expand Up @@ -464,8 +474,6 @@ type Reader struct {

func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64

for i, b := range cr.bs {
if b.Len() < SegmentHeaderSize {
return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i)
Expand All @@ -479,9 +487,8 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return nil, errors.Errorf("invalid chunk format version %d", v)
}
totalSize += int64(b.Len())
cr.size += int64(b.Len())
}
cr.size = totalSize
return &cr, nil
}

Expand Down Expand Up @@ -594,9 +601,15 @@ func nextSequenceFile(dir string) (string, int, error) {
if err != nil {
continue
}
i = j
if j > i {
i = j
}
}
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
return segmentFile(dir, int(i+1)), int(i + 1), nil
}

func segmentFile(baseDir string, index int) string {
return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index))
}

func sequenceFiles(dir string) ([]string, error) {
Expand All @@ -605,7 +618,6 @@ func sequenceFiles(dir string) ([]string, error) {
return nil, err
}
var res []string

for _, fi := range files {
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
Expand Down
Loading