Skip to content

Commit

Permalink
chore: error index file path collision (#4297)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Jan 10, 2024
1 parent 8f1e6b1 commit 37b2e35
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
5 changes: 4 additions & 1 deletion enterprise/reporting/error_index/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strconv"
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/filemanager"

"github.com/samber/lo"
Expand Down Expand Up @@ -191,6 +193,7 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) ([]*jobsdb
if err != nil {
return nil, fmt.Errorf("uploading aggregated payloads: %w", err)
}
w.log.Debugn("successfully uploaded aggregated payloads", logger.NewStringField("location", uploadFile.Location))

statusList = append(statusList, lo.Map(jobWithPayloads, func(item jobWithPayload, index int) *jobsdb.JobStatusT {
return &jobsdb.JobStatusT{
Expand Down Expand Up @@ -225,7 +228,7 @@ func (w *worker) uploadPayloads(ctx context.Context, payloads []payload) (*filem
minFailedAt := payloads[0].FailedAtTime()
maxFailedAt := payloads[len(payloads)-1].FailedAtTime()

filePath := path.Join(dir, fmt.Sprintf("%d_%d_%s.parquet", minFailedAt.Unix(), maxFailedAt.Unix(), w.config.instanceID))
filePath := path.Join(dir, fmt.Sprintf("%d_%d_%s_%s.parquet", minFailedAt.Unix(), maxFailedAt.Unix(), w.config.instanceID, uuid.NewString()))

f, err := os.Create(filePath)
if err != nil {
Expand Down
42 changes: 34 additions & 8 deletions enterprise/reporting/error_index/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"path"
"slices"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/tidwall/gjson"

kitsync "github.com/rudderlabs/rudder-go-kit/sync"

"github.com/minio/minio-go/v7"
Expand Down Expand Up @@ -257,7 +258,7 @@ func TestWorkerWriter(t *testing.T) {
}).LastValue())

lastFailedAt := failedAt.Add(time.Duration(len(jobs)-1) * time.Second)
filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet",
filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s**.parquet",
minioResource.BucketName,
w.sourceID,
failedAt.Format("2006-01-02"),
Expand All @@ -270,16 +271,19 @@ func TestWorkerWriter(t *testing.T) {
require.Len(t, failedMessages, len(jobs))
require.EqualValues(t, payloads, failedMessages)

s3SelectPath := fmt.Sprintf("%s/%s/%s/%d_%d_%s.parquet",
s3SelectPathPrefix := fmt.Sprintf("%s/%s/%s/%d_%d_%s",
w.sourceID,
failedAt.Format("2006-01-02"),
strconv.Itoa(failedAt.Hour()),
failedAt.Unix(),
lastFailedAt.Unix(),
instanceID,
)
objects := minioObjects(t, ctx, minioResource, s3SelectPathPrefix)
require.Len(t, objects, 1)

s3SelectQuery := fmt.Sprintf("SELECT message_id, source_id, destination_id, transformation_id, tracking_plan_id, failed_stage, event_type, event_name, received_at, failed_at FROM S3Object WHERE failed_at >= %d AND failed_at <= %d", failedAt.UTC().UnixMicro(), lastFailedAt.UTC().UnixMicro())
failedMessagesUsing3Select := failedMessagesUsingMinioS3Select(t, ctx, minioResource, s3SelectPath, s3SelectQuery)
failedMessagesUsing3Select := failedMessagesUsingMinioS3Select(t, ctx, minioResource, objects[0], s3SelectQuery)
slices.SortFunc(failedMessagesUsing3Select, func(a, b payload) int {
return a.FailedAtTime().Compare(b.FailedAtTime())
})
Expand All @@ -298,7 +302,17 @@ func TestWorkerWriter(t *testing.T) {
require.Len(t, jr.Jobs, len(jobs))

lo.ForEach(jr.Jobs, func(item *jobsdb.JobT, index int) {
require.EqualValues(t, string(item.LastJobStatus.ErrorResponse), fmt.Sprintf(`{"location": "%s"}`, strings.Replace(filePath, "s3://", fmt.Sprintf("http://%s/", minioResource.Endpoint), 1)))
filePath := fmt.Sprintf("http://%s/%s/%s/%s/%s/%d_%d_%s.+.parquet",
minioResource.Endpoint,
minioResource.BucketName,
w.sourceID,
failedAt.Format("2006-01-02"),
strconv.Itoa(failedAt.Hour()),
failedAt.Unix(),
lastFailedAt.Unix(),
instanceID,
)
require.Regexp(t, filePath, gjson.GetBytes(item.LastJobStatus.ErrorResponse, "location").String())
})
})
t.Run("multiple hours and days", func(t *testing.T) {
Expand Down Expand Up @@ -380,7 +394,7 @@ func TestWorkerWriter(t *testing.T) {

for i := 0; i < count; i++ {
failedAt := failedAt.Add(time.Duration(i) * time.Hour)
filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet",
filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s**.parquet",
minioResource.BucketName,
w.sourceID,
failedAt.Format("2006-01-02"),
Expand All @@ -406,7 +420,7 @@ func TestWorkerWriter(t *testing.T) {

lo.ForEach(jr.Jobs, func(item *jobsdb.JobT, index int) {
failedAt := failedAt.Add(time.Duration(index) * time.Hour)
filePath := fmt.Sprintf("http://%s/%s/%s/%s/%s/%d_%d_%s.parquet",
filePath := fmt.Sprintf("http://%s/%s/%s/%s/%s/%d_%d_%s.+.parquet",
minioResource.Endpoint,
minioResource.BucketName,
w.sourceID,
Expand All @@ -416,7 +430,7 @@ func TestWorkerWriter(t *testing.T) {
failedAt.Unix(),
instanceID,
)
require.EqualValues(t, string(item.LastJobStatus.ErrorResponse), fmt.Sprintf(`{"location": "%s"}`, strings.Replace(filePath, "s3://", fmt.Sprintf("http://%s/", minioResource.Endpoint), 1)))
require.Regexp(t, filePath, gjson.GetBytes(item.LastJobStatus.ErrorResponse, "location").String())
})
})
t.Run("limits reached but few left without crossing upload frequency", func(t *testing.T) {
Expand Down Expand Up @@ -512,6 +526,18 @@ func TestWorkerWriter(t *testing.T) {
})
}

func minioObjects(t testing.TB, ctx context.Context, mr *resource.MinioResource, prefix string) (objects []string) {
t.Helper()

for objInfo := range mr.Client.ListObjects(ctx, mr.BucketName, minio.ListObjectsOptions{
Recursive: true,
Prefix: prefix,
}) {
objects = append(objects, objInfo.Key)
}
return
}

func failedMessagesUsingMinioS3Select(t testing.TB, ctx context.Context, mr *resource.MinioResource, filePath, query string) []payload {
t.Helper()

Expand Down

0 comments on commit 37b2e35

Please sign in to comment.