diff --git a/pkg/storage/parquet/converter_marker.go b/pkg/storage/parquet/converter_marker.go index 5927c5f083c..d53f24d413c 100644 --- a/pkg/storage/parquet/converter_marker.go +++ b/pkg/storage/parquet/converter_marker.go @@ -36,7 +36,7 @@ func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Instr return &ConverterMark{}, err } - defer runutil.CloseWithLogOnErr(logger, reader, "close bucket index reader") + defer runutil.CloseWithLogOnErr(logger, reader, "close parquet converter marker file reader") metaContent, err := io.ReadAll(reader) if err != nil { @@ -59,3 +59,8 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck } return userBkt.Upload(ctx, markerPath, bytes.NewReader(b)) } + +// ConverterMarkMeta is used in Bucket Index. It might not be the same as ConverterMark. +type ConverterMarkMeta struct { + Version int `json:"version"` +} diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 7892eef890b..6a32b0e567b 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -3,6 +3,7 @@ package bucketindex import ( "fmt" "path/filepath" + "slices" "strings" "time" @@ -11,6 +12,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" ) @@ -51,14 +53,14 @@ func (idx *Index) GetUpdatedAt() time.Time { func (idx *Index) RemoveBlock(id ulid.ULID) { for i := 0; i < len(idx.Blocks); i++ { if idx.Blocks[i].ID == id { - idx.Blocks = append(idx.Blocks[:i], idx.Blocks[i+1:]...) + idx.Blocks = slices.Delete(idx.Blocks, i, i+1) break } } for i := 0; i < len(idx.BlockDeletionMarks); i++ { if idx.BlockDeletionMarks[i].ID == id { - idx.BlockDeletionMarks = append(idx.BlockDeletionMarks[:i], idx.BlockDeletionMarks[i+1:]...) + idx.BlockDeletionMarks = slices.Delete(idx.BlockDeletionMarks, i, i+1) break } } @@ -91,6 +93,9 @@ type Block struct { // UploadedAt is a unix timestamp (seconds precision) of when the block has been completed to be uploaded // to the storage. UploadedAt int64 `json:"uploaded_at"` + + // Parquet metadata if exists. If doesn't exist it will be nil. + Parquet *parquet.ConverterMarkMeta `json:"parquet,omitempty"` } // Within returns whether the block contains samples within the provided range. diff --git a/pkg/storage/tsdb/bucketindex/markers.go b/pkg/storage/tsdb/bucketindex/markers.go index fd67161e844..89b0bf7db61 100644 --- a/pkg/storage/tsdb/bucketindex/markers.go +++ b/pkg/storage/tsdb/bucketindex/markers.go @@ -82,6 +82,24 @@ func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) { return id, err == nil } +// IsBlockParquetConverterMarkFilename returns whether the input filename matches the expected pattern +// of block parquet converter markers stored in the markers location. +func IsBlockParquetConverterMarkFilename(name string) (ulid.ULID, bool) { + parts := strings.SplitN(name, "-", 2) + if len(parts) != 2 { + return ulid.ULID{}, false + } + + // Ensure the 2nd part matches the parquet converter mark filename. + if parts[1] != parquet.ConverterMarkerFileName { + return ulid.ULID{}, false + } + + // Ensure the 1st part is a valid block ID. + id, err := ulid.Parse(filepath.Base(parts[0])) + return id, err == nil +} + // MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for // a deletion mark in the block location. Found deletion marks are copied to the global markers location. // The migration continues on error and returns once all blocks have been checked. diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index cee3e6e3bf4..458316ab2e8 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -15,6 +15,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/cortexproject/cortex/pkg/storage/parquet" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -33,8 +34,9 @@ var ( // Updater is responsible to generate an update in-memory bucket index. type Updater struct { - bkt objstore.InstrumentedBucket - logger log.Logger + bkt objstore.InstrumentedBucket + logger log.Logger + parquetEnabled bool } func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *Updater { @@ -44,11 +46,18 @@ func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantCon } } +func (w *Updater) EnableParquet() *Updater { + w.parquetEnabled = true + return w +} + // UpdateIndex generates the bucket index and returns it, without storing it to the storage. // If the old index is not passed in input, then the bucket index will be generated from scratch. func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid.ULID]error, int64, error) { - var oldBlocks []*Block - var oldBlockDeletionMarks []*BlockDeletionMark + var ( + oldBlocks []*Block + oldBlockDeletionMarks []*BlockDeletionMark + ) // Read the old index, if provided. if old != nil { @@ -65,6 +74,11 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid if err != nil { return nil, nil, 0, err } + if w.parquetEnabled { + if err := w.updateParquetBlocks(ctx, blocks); err != nil { + return nil, nil, 0, err + } + } return &Index{ Version: IndexVersion1, @@ -180,6 +194,23 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo return block, nil } +func (w *Updater) updateParquetBlockIndexEntry(ctx context.Context, id ulid.ULID, block *Block) error { + marker, err := parquet.ReadConverterMark(ctx, id, w.bkt, w.logger) + if err != nil { + return errors.Wrapf(err, "read parquet converter marker file: %v", path.Join(id.String(), parquet.ConverterMarkerFileName)) + } + // Could be not found or access denied. + // Just treat it as no parquet block available. + if marker == nil || marker.Version == 0 { + return nil + } + + block.Parquet = &parquet.ConverterMarkMeta{ + Version: marker.Version, + } + return nil +} + func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, map[ulid.ULID]struct{}, int64, error) { out := make([]*BlockDeletionMark, 0, len(old)) deletedBlocks := map[ulid.ULID]struct{}{} @@ -249,3 +280,31 @@ func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid return BlockDeletionMarkFromThanosMarker(&m), nil } + +func (w *Updater) updateParquetBlocks(ctx context.Context, blocks []*Block) error { + discoveredParquetBlocks := map[ulid.ULID]struct{}{} + + // Find all parquet markers in the storage. + if err := w.bkt.Iter(ctx, parquet.ConverterMarkerPrefix+"/", func(name string) error { + if blockID, ok := IsBlockParquetConverterMarkFilename(path.Base(name)); ok { + discoveredParquetBlocks[blockID] = struct{}{} + } + + return nil + }); err != nil { + return errors.Wrap(err, "list block parquet converter marks") + } + + // Check if parquet mark has been uploaded or deleted for the block. + for _, m := range blocks { + if _, ok := discoveredParquetBlocks[m.ID]; ok { + if err := w.updateParquetBlockIndexEntry(ctx, m.ID, m); err != nil { + return err + } + } else if m.Parquet != nil { + // Converter marker removed. Reset parquet field. + m.Parquet = nil + } + } + return nil +} diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index 21fdcb234b8..9b2bc58b72c 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -3,6 +3,7 @@ package bucketindex import ( "bytes" "context" + "encoding/json" "path" "strings" "testing" @@ -21,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/parquet" "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) @@ -301,6 +303,150 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { } } +func TestUpdater_UpdateIndex_WithParquet(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + + // Generate the initial index. + bkt = BucketWithGlobalMarkers(bkt) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) + block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + // Add parquet marker to block 1. + block1ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block1) + + w := NewUpdater(bkt, userID, nil, logger).EnableParquet() + returnedIdx, _, _, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID, + []tsdb.BlockMeta{block1, block2}, + []*metadata.DeletionMark{block2Mark}, map[string]*parquet.ConverterMarkMeta{ + block1.ULID.String(): {Version: block1ParquetMark.Version}, + }) + + // Create new blocks, and update the index. + block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) + block4 := testutil.MockStorageBlock(t, bkt, userID, 40, 50) + block4Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block4) + + returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx) + require.NoError(t, err) + assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID, + []tsdb.BlockMeta{block1, block2, block3, block4}, + []*metadata.DeletionMark{block2Mark, block4Mark}, + map[string]*parquet.ConverterMarkMeta{ + block1.ULID.String(): {Version: block1ParquetMark.Version}, + }) + + // Hard delete a block and update the index. + require.NoError(t, block.Delete(ctx, log.NewNopLogger(), bucket.NewUserBucketClient(userID, bkt, nil), block2.ULID)) + + returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx) + require.NoError(t, err) + assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID, + []tsdb.BlockMeta{block1, block3, block4}, + []*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{ + block1.ULID.String(): {Version: block1ParquetMark.Version}, + }) + + // Upload parquet marker to an old block and update index + block3ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block3) + returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx) + require.NoError(t, err) + assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID, + []tsdb.BlockMeta{block1, block3, block4}, + []*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{ + block1.ULID.String(): {Version: block1ParquetMark.Version}, + block3.ULID.String(): {Version: block3ParquetMark.Version}, + }) +} + +func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) { + const userID = "user-1" + ctx := context.Background() + logger := log.NewNopLogger() + + tests := []struct { + name string + setupBucket func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket + expectedError error + expectParquet bool + expectParquetMeta *parquet.ConverterMarkMeta + }{ + { + name: "should successfully read parquet marker", + setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket { + parquetMark := parquet.ConverterMarkMeta{ + Version: 1, + } + data, err := json.Marshal(parquetMark) + require.NoError(t, err) + require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName), bytes.NewReader(data))) + return bkt + }, + expectedError: nil, + expectParquet: true, + expectParquetMeta: &parquet.ConverterMarkMeta{Version: 1}, + }, + { + name: "should handle missing parquet marker", + setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket { + // Don't upload any parquet marker + return bkt + }, + expectedError: nil, + expectParquet: false, + }, + { + name: "should handle access denied", + setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket { + return &testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName): testutil.ErrKeyAccessDeniedError, + }, + } + }, + expectedError: nil, + expectParquet: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + bkt, _ := testutil.PrepareFilesystemBucket(t) + blockID := ulid.MustNew(1, nil) + block := &Block{ID: blockID} + + // Setup the bucket with test data + bkt = tc.setupBucket(t, bkt, blockID) + + // Create an instrumented bucket wrapper + registry := prometheus.NewRegistry() + ibkt := objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry), "test-bucket") + w := NewUpdater(ibkt, userID, nil, logger) + + err := w.updateParquetBlockIndexEntry(ctx, blockID, block) + if tc.expectedError != nil { + assert.True(t, errors.Is(err, tc.expectedError)) + } else { + assert.NoError(t, err) + } + + if tc.expectParquet { + assert.NotNil(t, block.Parquet) + assert.Equal(t, tc.expectParquetMeta, block.Parquet) + } else { + assert.Nil(t, block.Parquet) + } + }) + } +} + func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockID ulid.ULID) int64 { metaFile := path.Join(userID, blockID.String(), block.MetaFilename) @@ -338,3 +484,36 @@ func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userI assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks) } + +func assertBucketIndexEqualWithParquet(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark, parquetBlocks map[string]*parquet.ConverterMarkMeta) { + assert.Equal(t, IndexVersion1, idx.Version) + assert.InDelta(t, time.Now().Unix(), idx.UpdatedAt, 2) + + // Build the list of expected block index entries. + var expectedBlockEntries []*Block + for _, b := range expectedBlocks { + block := &Block{ + ID: b.ULID, + MinTime: b.MinTime, + MaxTime: b.MaxTime, + UploadedAt: getBlockUploadedAt(t, bkt, userID, b.ULID), + } + if meta, ok := parquetBlocks[b.ULID.String()]; ok { + block.Parquet = meta + } + expectedBlockEntries = append(expectedBlockEntries, block) + } + + assert.ElementsMatch(t, expectedBlockEntries, idx.Blocks) + + // Build the list of expected block deletion mark index entries. + var expectedMarkEntries []*BlockDeletionMark + for _, m := range expectedDeletionMarks { + expectedMarkEntries = append(expectedMarkEntries, &BlockDeletionMark{ + ID: m.ID, + DeletionTime: m.DeletionTime, + }) + } + + assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks) +} diff --git a/pkg/storage/tsdb/testutil/block_mock.go b/pkg/storage/tsdb/testutil/block_mock.go index 8aad109734b..5ceb9f2c34b 100644 --- a/pkg/storage/tsdb/testutil/block_mock.go +++ b/pkg/storage/tsdb/testutil/block_mock.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/storage/parquet" ) func MockStorageBlock(t testing.TB, bucket objstore.Bucket, userID string, minT, maxT int64) tsdb.BlockMeta { @@ -86,3 +88,20 @@ func MockStorageNonCompactionMark(t testing.TB, bucket objstore.Bucket, userID s return &mark } + +func MockStorageParquetConverterMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *parquet.ConverterMark { + mark := parquet.ConverterMark{ + Version: 1, + } + + markContent, err := json.Marshal(mark) + if err != nil { + panic("failed to marshal mocked parquet converter marker") + } + + markContentReader := strings.NewReader(string(markContent)) + markPath := fmt.Sprintf("%s/%s/%s", userID, meta.ULID.String(), parquet.ConverterMarkerFileName) + require.NoError(t, bucket.Upload(context.Background(), markPath, markContentReader)) + + return &mark +}