diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index bcfd8237d71..d0ce910ca14 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -386,7 +386,8 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } - if marker.Version == cortex_parquet.CurrentVersion { + // We don't convert blocks again if they already have a valid converter mark. + if cortex_parquet.ValidConverterMarkVersion(marker.Version) { continue } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 46c9c4b18b8..03a96a68cf2 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -1,7 +1,9 @@ package parquetconverter import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -414,3 +416,71 @@ func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits { func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits { return m.limits } + +func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { + cfg := prepareConfig() + user := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + lbls := labels.FromStrings("__name__", "test") + + // Create a block + rnd := rand.New(rand.NewSource(time.Now().Unix())) + blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, 2*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + + // Upload the block to the bucket + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + // Write a converter mark with version 1 to simulate an already converted block + markerV1 := parquet.ConverterMark{ + Version: parquet.ParquetConverterMarkVersion1, + } + markerBytes, err := json.Marshal(markerV1) + require.NoError(t, err) + markerPath := path.Join(blockID.String(), parquet.ConverterMarkerFileName) + err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes)) + require.NoError(t, err) + + // Verify the marker exists with version 1 + marker, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.Equal(t, parquet.ParquetConverterMarkVersion1, marker.Version) + + // Start the converter + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck + + // Wait a bit for the converter to process blocks + time.Sleep(5 * time.Second) + + // Verify the marker version is still 1 (i.e., the block was not converted again) + markerAfter, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.Equal(t, parquet.ParquetConverterMarkVersion1, markerAfter.Version, "block with existing marker version 1 should not be converted again") + + // Verify that no conversion happened by checking the convertedBlocks metric + // It should be 0 since the block was already converted + assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) +} diff --git a/pkg/storage/parquet/converter_marker.go b/pkg/storage/parquet/converter_marker.go index 380b4244fed..4aeb41ebdfa 100644 --- a/pkg/storage/parquet/converter_marker.go +++ b/pkg/storage/parquet/converter_marker.go @@ -19,7 +19,12 @@ import ( const ( ConverterMarkerPrefix = "parquet-markers" ConverterMarkerFileName = "parquet-converter-mark.json" - CurrentVersion = 1 + + CurrentVersion = ParquetConverterMarkVersion2 + ParquetConverterMarkVersion1 = 1 + // ParquetConverterMarkVersion2 has an additional series hash + // column which is used for projection pushdown. + ParquetConverterMarkVersion2 = 2 ) type ConverterMark struct { @@ -64,3 +69,7 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck type ConverterMarkMeta struct { Version int `json:"version"` } + +func ValidConverterMarkVersion(version int) bool { + return version == ParquetConverterMarkVersion1 || version == ParquetConverterMarkVersion2 +}