Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce a new Object Storage WAL format. #13253

Merged
merged 25 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0e5aa15
wip
cyriltovena May 16, 2024
2ef5c3c
wip
cyriltovena May 16, 2024
d68a08d
wip
cyriltovena May 28, 2024
144bb9c
add some doc and vision
cyriltovena May 30, 2024
5f8cf08
move compressed len to chunk
cyriltovena May 30, 2024
f32c755
work on the chunk encoding
cyriltovena May 31, 2024
19bbd76
missing changes
cyriltovena May 31, 2024
9e1d5b1
working on fixes and tests
cyriltovena May 31, 2024
c8b792f
add more tests and found a bug with dod
cyriltovena Jun 2, 2024
749acf7
fix(wal): Use varint encoding for ts_2_dod in WAL format
cyriltovena Jun 3, 2024
7590f55
refactor(wal): Remove unnecessary code in writeChunk function
cyriltovena Jun 3, 2024
7991408
chore: Refactor ChunkReader to improve performance and memory usage
cyriltovena Jun 3, 2024
38fcad4
chore: Add more realistic tests and benchmarks
cyriltovena Jun 3, 2024
bdf389f
refactor: Update index writer to support in memory buffer.
cyriltovena Jun 5, 2024
296daee
pausing work I need a new index different than the current work
cyriltovena Jun 7, 2024
d1cfcae
Add a special in memory index for the wal package
cyriltovena Jun 10, 2024
37ea6d6
Finalize writing and start reading index
cyriltovena Jun 10, 2024
d649646
Add offset/start to chunk ref
cyriltovena Jun 10, 2024
fd1dbd8
wip
cyriltovena Jun 13, 2024
b49d2ba
refactor(wal): Implement SeriesIter.
cyriltovena Jun 16, 2024
f575efb
fix(wal): Fixes snappy block offsets counting.
cyriltovena Jun 17, 2024
071ee04
chore: update format doc to reflect latest changes
cyriltovena Jun 18, 2024
6227361
chore: lint
cyriltovena Jun 18, 2024
f625252
refactor: Removes changes not required.
cyriltovena Jun 18, 2024
32b1d2c
chore: format
cyriltovena Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
)

type errTooFarBehind struct {
// original timestmap of the entry itself.
// original timestamp of the entry itself.
entryTs time.Time

// cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (m *Metadata) EnsureBounds(from, through int64) {
if m.Through == 0 || through > m.Through {
m.Through = through
}

}

// NewTOCFromByteSlice return parsed TOC from given index byte slice.
Expand Down Expand Up @@ -1646,7 +1645,6 @@ func readFingerprintOffsetsTable(bs ByteSlice, off uint64) (FingerprintOffsets,
}

return res, d.Err()

}

// Close the reader and its underlying resources.
Expand Down Expand Up @@ -2074,7 +2072,7 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
if len(l) != 4*n {
return 0, nil, fmt.Errorf("unexpected postings length, should be %d bytes for %d postings, got %d bytes", 4*n, n, len(l))
}
return n, newBigEndianPostings(l), nil
return n, NewBigEndianPostings(l), nil
}

// LabelNamesOffsetsFor decodes the offsets of the name symbols for a given series.
Expand Down Expand Up @@ -2335,7 +2333,6 @@ func (dec *Decoder) readChunkStatsV3(d *encoding.Decbuf, from, through int64) (r
}

return res, d.Err()

}

func (dec *Decoder) accumulateChunkStats(d *encoding.Decbuf, nChunks int, from, through int64) (res ChunkStats, err error) {
Expand Down Expand Up @@ -2372,16 +2369,13 @@ func (dec *Decoder) readChunkStatsPriorV3(d *encoding.Decbuf, seriesRef storage.
} else if chk.MinTime >= through {
break
}

}

return res, nil

}

// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(version int, b []byte, seriesRef storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {

d, fprint, err := dec.prepSeries(b, lbls, chks)
if err != nil {
return 0, err
Expand All @@ -2392,7 +2386,6 @@ func (dec *Decoder) Series(version int, b []byte, seriesRef storage.SeriesRef, f
return 0, errors.Wrapf(err, "series %s", lbls.String())
}
return fprint, nil

}

func (dec *Decoder) readChunks(version int, d *encoding.Decbuf, seriesRef storage.SeriesRef, from int64, through int64, chks *[]ChunkMeta) error {
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,22 +777,22 @@ func (it *ListPostings) Err() error {
return nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// BigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
type BigEndianPostings struct {
list []byte
cur uint32
}

func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
func NewBigEndianPostings(list []byte) *BigEndianPostings {
return &BigEndianPostings{list: list}
}

func (it *bigEndianPostings) At() storage.SeriesRef {
func (it *BigEndianPostings) At() storage.SeriesRef {
return storage.SeriesRef(it.cur)
}

func (it *bigEndianPostings) Next() bool {
func (it *BigEndianPostings) Next() bool {
if len(it.list) >= 4 {
it.cur = binary.BigEndian.Uint32(it.list)
it.list = it.list[4:]
Expand All @@ -801,7 +801,7 @@ func (it *bigEndianPostings) Next() bool {
return false
}

func (it *bigEndianPostings) Seek(x storage.SeriesRef) bool {
func (it *BigEndianPostings) Seek(x storage.SeriesRef) bool {
if storage.SeriesRef(it.cur) >= x {
return true
}
Expand All @@ -821,7 +821,7 @@ func (it *bigEndianPostings) Seek(x storage.SeriesRef) bool {
return false
}

func (it *bigEndianPostings) Err() error {
func (it *BigEndianPostings) Err() error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestBigEndian(t *testing.T) {
}

t.Run("Iteration", func(t *testing.T) {
bep := newBigEndianPostings(beLst)
bep := NewBigEndianPostings(beLst)
for i := 0; i < num; i++ {
require.True(t, bep.Next())
require.Equal(t, storage.SeriesRef(ls[i]), bep.At())
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestBigEndian(t *testing.T) {
},
}

bep := newBigEndianPostings(beLst)
bep := NewBigEndianPostings(beLst)

for _, v := range table {
require.Equal(t, v.found, bep.Seek(storage.SeriesRef(v.seek)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ func (c *IndexClient) Volume(ctx context.Context, userID string, from, through m
}

func (c *IndexClient) GetShards(ctx context.Context, userID string, from, through model.Time, targetBytesPerShard uint64, predicate chunk.Predicate) (*logproto.ShardsResponse, error) {

// TODO(owen-d): perf, this is expensive :(
var mtx sync.Mutex

Expand Down
183 changes: 183 additions & 0 deletions pkg/storage/wal/README.md

Large diffs are not rendered by default.

Loading
Loading