From 37b2e35575e362c894c03e176dbb650f61d9c6af Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Wed, 10 Jan 2024 21:13:24 +0530 Subject: [PATCH] chore: error index file path collision (#4297) --- enterprise/reporting/error_index/worker.go | 5 ++- .../reporting/error_index/worker_test.go | 42 +++++++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/enterprise/reporting/error_index/worker.go b/enterprise/reporting/error_index/worker.go index e3b396e66b..8fbbd2adbb 100644 --- a/enterprise/reporting/error_index/worker.go +++ b/enterprise/reporting/error_index/worker.go @@ -12,6 +12,8 @@ import ( "strconv" "time" + "github.com/google/uuid" + "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/samber/lo" @@ -191,6 +193,7 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) ([]*jobsdb if err != nil { return nil, fmt.Errorf("uploading aggregated payloads: %w", err) } + w.log.Debugn("successfully uploaded aggregated payloads", logger.NewStringField("location", uploadFile.Location)) statusList = append(statusList, lo.Map(jobWithPayloads, func(item jobWithPayload, index int) *jobsdb.JobStatusT { return &jobsdb.JobStatusT{ @@ -225,7 +228,7 @@ func (w *worker) uploadPayloads(ctx context.Context, payloads []payload) (*filem minFailedAt := payloads[0].FailedAtTime() maxFailedAt := payloads[len(payloads)-1].FailedAtTime() - filePath := path.Join(dir, fmt.Sprintf("%d_%d_%s.parquet", minFailedAt.Unix(), maxFailedAt.Unix(), w.config.instanceID)) + filePath := path.Join(dir, fmt.Sprintf("%d_%d_%s_%s.parquet", minFailedAt.Unix(), maxFailedAt.Unix(), w.config.instanceID, uuid.NewString())) f, err := os.Create(filePath) if err != nil { diff --git a/enterprise/reporting/error_index/worker_test.go b/enterprise/reporting/error_index/worker_test.go index fb144e766f..0e85d83cb5 100644 --- a/enterprise/reporting/error_index/worker_test.go +++ b/enterprise/reporting/error_index/worker_test.go @@ -12,11 +12,12 @@ import ( "path" "slices" "strconv" - "strings" "sync" "testing" "time" + "github.com/tidwall/gjson" + kitsync "github.com/rudderlabs/rudder-go-kit/sync" "github.com/minio/minio-go/v7" @@ -257,7 +258,7 @@ func TestWorkerWriter(t *testing.T) { }).LastValue()) lastFailedAt := failedAt.Add(time.Duration(len(jobs)-1) * time.Second) - filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet", + filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s**.parquet", minioResource.BucketName, w.sourceID, failedAt.Format("2006-01-02"), @@ -270,7 +271,7 @@ func TestWorkerWriter(t *testing.T) { require.Len(t, failedMessages, len(jobs)) require.EqualValues(t, payloads, failedMessages) - s3SelectPath := fmt.Sprintf("%s/%s/%s/%d_%d_%s.parquet", + s3SelectPathPrefix := fmt.Sprintf("%s/%s/%s/%d_%d_%s", w.sourceID, failedAt.Format("2006-01-02"), strconv.Itoa(failedAt.Hour()), @@ -278,8 +279,11 @@ func TestWorkerWriter(t *testing.T) { lastFailedAt.Unix(), instanceID, ) + objects := minioObjects(t, ctx, minioResource, s3SelectPathPrefix) + require.Len(t, objects, 1) + s3SelectQuery := fmt.Sprintf("SELECT message_id, source_id, destination_id, transformation_id, tracking_plan_id, failed_stage, event_type, event_name, received_at, failed_at FROM S3Object WHERE failed_at >= %d AND failed_at <= %d", failedAt.UTC().UnixMicro(), lastFailedAt.UTC().UnixMicro()) - failedMessagesUsing3Select := failedMessagesUsingMinioS3Select(t, ctx, minioResource, s3SelectPath, s3SelectQuery) + failedMessagesUsing3Select := failedMessagesUsingMinioS3Select(t, ctx, minioResource, objects[0], s3SelectQuery) slices.SortFunc(failedMessagesUsing3Select, func(a, b payload) int { return a.FailedAtTime().Compare(b.FailedAtTime()) }) @@ -298,7 +302,17 @@ func TestWorkerWriter(t *testing.T) { require.Len(t, jr.Jobs, len(jobs)) lo.ForEach(jr.Jobs, func(item *jobsdb.JobT, index int) { - require.EqualValues(t, string(item.LastJobStatus.ErrorResponse), fmt.Sprintf(`{"location": "%s"}`, strings.Replace(filePath, "s3://", fmt.Sprintf("http://%s/", minioResource.Endpoint), 1))) + filePath := fmt.Sprintf("http://%s/%s/%s/%s/%s/%d_%d_%s.+.parquet", + minioResource.Endpoint, + minioResource.BucketName, + w.sourceID, + failedAt.Format("2006-01-02"), + strconv.Itoa(failedAt.Hour()), + failedAt.Unix(), + lastFailedAt.Unix(), + instanceID, + ) + require.Regexp(t, filePath, gjson.GetBytes(item.LastJobStatus.ErrorResponse, "location").String()) }) }) t.Run("multiple hours and days", func(t *testing.T) { @@ -380,7 +394,7 @@ func TestWorkerWriter(t *testing.T) { for i := 0; i < count; i++ { failedAt := failedAt.Add(time.Duration(i) * time.Hour) - filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet", + filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s**.parquet", minioResource.BucketName, w.sourceID, failedAt.Format("2006-01-02"), @@ -406,7 +420,7 @@ func TestWorkerWriter(t *testing.T) { lo.ForEach(jr.Jobs, func(item *jobsdb.JobT, index int) { failedAt := failedAt.Add(time.Duration(index) * time.Hour) - filePath := fmt.Sprintf("http://%s/%s/%s/%s/%s/%d_%d_%s.parquet", + filePath := fmt.Sprintf("http://%s/%s/%s/%s/%s/%d_%d_%s.+.parquet", minioResource.Endpoint, minioResource.BucketName, w.sourceID, @@ -416,7 +430,7 @@ func TestWorkerWriter(t *testing.T) { failedAt.Unix(), instanceID, ) - require.EqualValues(t, string(item.LastJobStatus.ErrorResponse), fmt.Sprintf(`{"location": "%s"}`, strings.Replace(filePath, "s3://", fmt.Sprintf("http://%s/", minioResource.Endpoint), 1))) + require.Regexp(t, filePath, gjson.GetBytes(item.LastJobStatus.ErrorResponse, "location").String()) }) }) t.Run("limits reached but few left without crossing upload frequency", func(t *testing.T) { @@ -512,6 +526,18 @@ func TestWorkerWriter(t *testing.T) { }) } +func minioObjects(t testing.TB, ctx context.Context, mr *resource.MinioResource, prefix string) (objects []string) { + t.Helper() + + for objInfo := range mr.Client.ListObjects(ctx, mr.BucketName, minio.ListObjectsOptions{ + Recursive: true, + Prefix: prefix, + }) { + objects = append(objects, objInfo.Key) + } + return +} + func failedMessagesUsingMinioS3Select(t testing.TB, ctx context.Context, mr *resource.MinioResource, filePath, query string) []payload { t.Helper()