Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rf1): Store index ref in metastore #13613

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 70 additions & 29 deletions pkg/ingester-rf1/metastore/metastorepb/metastore.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/ingester-rf1/metastore/metastorepb/metastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ message BlockMeta {
int64 min_time = 3;
int64 max_time = 4;
uint32 compaction_level = 6;
uint64 indexRef = 7;

repeated TenantStreams tenant_streams = 7;
repeated TenantStreams tenant_streams = 8;
}

// TenantStreams object points to the offset in the block at which
Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/wal/ref.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package wal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file can be removed now


type DataRef uint64

func NewDataRef(offset, size uint64) DataRef {
return DataRef(offset<<32 | size)
}

func (b DataRef) Unpack() (int, int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the reason to pack these values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None really, we could use 2 int64 or 2 int32. The code was just available and I copy pasted.

Do you prefer to change it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're using it elsewhere then I'm happy to continue using it. If this is the first usage I'm less inclined to introduce this as a way of storing multiple values unless there is a good reason (maybe due to perf, storage concerns etc.)

offset := int(b >> 32)
size := int((b << 32) >> 32)
return offset, size
}
4 changes: 4 additions & 0 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type SegmentWriter struct {
inputSize atomic.Int64
idxWriter *index.Writer
consistencyMtx *sync.RWMutex
indexRef DataRef
}

type streamSegment struct {
Expand Down Expand Up @@ -209,6 +210,7 @@ func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta {
Id: id,
FormatVersion: uint64(1),
CompactionLevel: 0,
IndexRef: uint64(b.indexRef),
MinTime: globalMinT,
MaxTime: globalMaxT,
TenantStreams: result,
Expand Down Expand Up @@ -308,6 +310,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
if n != len(buf) {
return total, errors.New("invalid written index len")
}
b.indexRef = NewDataRef(uint64(total), uint64(n))
total += int64(n)

// write index len 4b
Expand Down Expand Up @@ -349,6 +352,7 @@ func (b *SegmentWriter) Reset() {
b.streams = make(map[streamID]*streamSegment, 64)
b.buf1.Reset()
b.inputSize.Store(0)
b.indexRef = 0
}

// InputSize returns the total size of the input data written to the writer.
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/wal/index"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"

"github.com/grafana/loki/pkg/push"
Expand Down Expand Up @@ -448,6 +449,8 @@ func TestReset(t *testing.T) {

func Test_Meta(t *testing.T) {
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
buff := bytes.NewBuffer(nil)

require.NoError(t, err)

lbls := labels.FromStrings("container", "foo", "namespace", "dev")
Expand All @@ -462,14 +465,23 @@ func Test_Meta(t *testing.T) {
{Timestamp: time.Unix(3, 0), Line: "Entry 2"},
{Timestamp: time.Unix(4, 0), Line: "Entry 3"},
})
_, err = w.WriteTo(buff)
require.NoError(t, err)
meta := w.Meta("bar")
ref := DataRef(meta.IndexRef)
offset, size := ref.Unpack()
indexReader, err := index.NewReader(index.RealByteSlice(buff.Bytes()[offset : offset+size]))
require.NoError(t, err)

defer indexReader.Close()

require.Equal(t, &metastorepb.BlockMeta{
FormatVersion: 1,
Id: "bar",
MinTime: time.Unix(1, 0).UnixNano(),
MaxTime: time.Unix(4, 0).UnixNano(),
CompactionLevel: 0,
IndexRef: meta.IndexRef,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenanta",
Expand Down
Loading