From 32da9eb6356de2b03e69ff7c0bee950d1b0f2db9 Mon Sep 17 00:00:00 2001 From: mihirgandhi Date: Tue, 2 Jan 2024 13:26:01 +0530 Subject: [PATCH 1/5] feat: filter destinations for events from rETL in processor --- processor/processor.go | 29 ++++++++++++++++++++++------ processor/processor_test.go | 38 ++++++++++++++++++++++++++++--------- utils/types/types.go | 1 + 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index 34373465b6..6eb7bd8e25 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1544,6 +1544,8 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf outCountMap := make(map[string]int64) // destinations enabled destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail) + // map of messageID to destinationID: for messages that needs to be delivered to a specific destinations only + msgToDestMap := make(map[string]string) spans := make([]stats.TraceSpan, 0, len(jobList)) defer func() { @@ -1551,6 +1553,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf span.End() } }() + for _, batchEvent := range jobList { var eventParams types.EventParams err := jsonfast.Unmarshal(batchEvent.Parameters, &eventParams) @@ -1560,6 +1563,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf sourceID := eventParams.SourceId traceParent := eventParams.TraceParent + destinationID := eventParams.DestinationID var span stats.TraceSpan if traceParent == "" { @@ -1617,6 +1621,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // Iterate through all the events in the batch for _, singularEvent := range gatewayBatchEvent.Batch { messageId := misc.GetStringifiedData(singularEvent["messageId"]) + if len(destinationID) != 0 { + msgToDestMap[messageId] = destinationID + } payloadFunc := ro.Memoize(func() json.RawMessage { payloadBytes, err := jsonfast.Marshal(singularEvent) @@ -1724,8 +1731,10 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // REPORTING - GATEWAY metrics - END // Getting all the destinations which are enabled for this - // event - if !proc.isDestinationAvailable(singularEvent, sourceID) { + // event will be dropped if no valid destination is present + // if empty string destinationID is all the destinations for the source is validated + // else only passed destinationID will be validated + if !proc.isDestinationAvailable(singularEvent, sourceId, destinationID) { continue } @@ -1893,7 +1902,13 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf destType := &enabledDestTypes[i] enabledDestinationsList := proc.getConsentFilteredDestinations( singularEvent, - proc.getEnabledDestinations(sourceId, *destType), + lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool { + destId := msgToDestMap[event.Metadata.MessageID] + if len(destId) != 0 { + return destId == item.ID + } + return len(destId) == 0 + }), ) // Adding a singular event multiple times if there are multiple destinations of same type @@ -3073,7 +3088,7 @@ func (*Handle) getLimiterPriority(partition string) kitsync.LimiterPriorityValue // check if event has eligible destinations to send to // // event will be dropped if no destination is found -func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string) bool { +func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string, destinationID string) bool { enabledDestTypes := integrations.FilterClientIntegrations( event, proc.getBackendEnabledDestinationTypes(sourceId), @@ -3083,7 +3098,7 @@ func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId return false } - if enabledDestinationsList := proc.getConsentFilteredDestinations( + if enabledDestinationsList := lo.Filter(proc.getConsentFilteredDestinations( event, lo.Flatten( lo.Map( @@ -3093,7 +3108,9 @@ func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId }, ), ), - ); len(enabledDestinationsList) == 0 { + ), func(dest backendconfig.DestinationT, index int) bool { + return len(destinationID) == 0 || dest.ID == destinationID + }); len(enabledDestinationsList) == 0 { proc.logger.Debug("No destination to route this event to") return false } diff --git a/processor/processor_test.go b/processor/processor_test.go index bf051fbd6c..6e08e0face 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2772,7 +2772,7 @@ var _ = Describe("Processor", Ordered, func() { defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) - Expect(processor.isDestinationAvailable(eventWithDeniedConsents, SourceIDOneTrustConsent)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithDeniedConsents, SourceIDOneTrustConsent, "")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithDeniedConsents, @@ -2783,7 +2783,7 @@ var _ = Describe("Processor", Ordered, func() { )), ).To(Equal(3)) // all except D1 and D3 - Expect(processor.isDestinationAvailable(eventWithoutDeniedConsents, SourceIDOneTrustConsent)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithoutDeniedConsents, SourceIDOneTrustConsent, "")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithoutDeniedConsents, @@ -2794,7 +2794,18 @@ var _ = Describe("Processor", Ordered, func() { )), ).To(Equal(5)) // all - Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent, "")).To(BeTrue()) + Expect( + len(processor.getConsentFilteredDestinations( + eventWithoutConsentManagementData, + processor.getEnabledDestinations( + SourceIDOneTrustConsent, + "destination-definition-name-enabled", + ), + )), + ).To(Equal(5)) // all + + Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent, "dest-id-1")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithoutConsentManagementData, @@ -2853,7 +2864,7 @@ var _ = Describe("Processor", Ordered, func() { ), ) Expect(len(filteredDestinations)).To(Equal(4)) // all except dest-id-5 since both purpose1 and purpose2 are denied - Expect(processor.isDestinationAvailable(event, SourceIDKetchConsent)).To(BeTrue()) + Expect(processor.isDestinationAvailable(event, SourceIDKetchConsent, "")).To(BeTrue()) }) It("should filter based on generic consent management preferences", func() { @@ -3010,7 +3021,7 @@ var _ = Describe("Processor", Ordered, func() { defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) - Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDGCM)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDGCM, "")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithoutConsentManagementData, @@ -3021,7 +3032,7 @@ var _ = Describe("Processor", Ordered, func() { )), ).To(Equal(9)) // all - Expect(processor.isDestinationAvailable(eventWithoutDeniedConsentsGCM, SourceIDGCM)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithoutDeniedConsentsGCM, SourceIDGCM, "")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithoutDeniedConsentsGCM, @@ -3032,7 +3043,7 @@ var _ = Describe("Processor", Ordered, func() { )), ).To(Equal(9)) // all - Expect(processor.isDestinationAvailable(eventWithCustomConsentsGCM, SourceIDGCM)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithCustomConsentsGCM, SourceIDGCM, "")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithCustomConsentsGCM, @@ -3043,7 +3054,7 @@ var _ = Describe("Processor", Ordered, func() { )), ).To(Equal(8)) // all except D13 - Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCM, SourceIDGCM)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCM, SourceIDGCM, "")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithDeniedConsentsGCM, @@ -3054,7 +3065,7 @@ var _ = Describe("Processor", Ordered, func() { )), ).To(Equal(7)) // all except D6 and D7 - Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM)).To(BeTrue()) + Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM, "")).To(BeTrue()) Expect( len(processor.getConsentFilteredDestinations( eventWithDeniedConsentsGCMKetch, @@ -3064,6 +3075,15 @@ var _ = Describe("Processor", Ordered, func() { ), )), ).To(Equal(8)) // all except D7 + + // some unknown destination ID is passed destination will be unavailable + Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM, "unknown-destination")).To(BeFalse()) + + // known destination ID is passed and destination is enabled + Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDTransient, DestinationIDEnabledA)).To(BeTrue()) + + // know destination ID is passed and destination is not enabled + Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDTransient, DestinationIDDisabled)).To(BeFalse()) }) }) diff --git a/utils/types/types.go b/utils/types/types.go index 578b99d434..6e2b0ce3cc 100644 --- a/utils/types/types.go +++ b/utils/types/types.go @@ -36,6 +36,7 @@ type EventParams struct { SourceId string `json:"source_id"` SourceTaskRunId string `json:"source_task_run_id"` TraceParent string `json:"traceparent"` + DestinationID string `json:"destination_id"` } // UserSuppression is interface to access Suppress user feature From b6ae3dc7dc54185764bad881887313d2fc324dcf Mon Sep 17 00:00:00 2001 From: mihirgandhi Date: Tue, 9 Jan 2024 15:57:10 +0530 Subject: [PATCH 2/5] rebased --- processor/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/processor.go b/processor/processor.go index 6eb7bd8e25..f0f10d73f4 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1734,7 +1734,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // event will be dropped if no valid destination is present // if empty string destinationID is all the destinations for the source is validated // else only passed destinationID will be validated - if !proc.isDestinationAvailable(singularEvent, sourceId, destinationID) { + if !proc.isDestinationAvailable(singularEvent, sourceID, destinationID) { continue } From bf5f703bab847147f27f6116f7b4663e1f5a872a Mon Sep 17 00:00:00 2001 From: mihirgandhi Date: Tue, 9 Jan 2024 16:01:48 +0530 Subject: [PATCH 3/5] lint fix --- processor/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/processor.go b/processor/processor.go index f0f10d73f4..5b18898121 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -3088,7 +3088,7 @@ func (*Handle) getLimiterPriority(partition string) kitsync.LimiterPriorityValue // check if event has eligible destinations to send to // // event will be dropped if no destination is found -func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string, destinationID string) bool { +func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId, destinationID string) bool { enabledDestTypes := integrations.FilterClientIntegrations( event, proc.getBackendEnabledDestinationTypes(sourceId), From 49550b3fedaa8f21d314cfca0de36ffb4f5577e4 Mon Sep 17 00:00:00 2001 From: mihirgandhi Date: Fri, 12 Jan 2024 17:34:32 +0530 Subject: [PATCH 4/5] added integration tests --- integration_test/docker_test/docker_test.go | 148 +++++++++++++++++++- 1 file changed, 145 insertions(+), 3 deletions(-) diff --git a/integration_test/docker_test/docker_test.go b/integration_test/docker_test/docker_test.go index d24835ffcf..5d5827993d 100644 --- a/integration_test/docker_test/docker_test.go +++ b/integration_test/docker_test/docker_test.go @@ -98,7 +98,7 @@ func TestMainFlow(t *testing.T) { sendEventsToGateway(t) t.Run("webhook", func(t *testing.T) { require.Eventually(t, func() bool { - return webhook.RequestsCount() == 10 + return webhook.RequestsCount() == 11 }, time.Minute, 300*time.Millisecond) i := -1 @@ -224,7 +224,7 @@ func TestMainFlow(t *testing.T) { var ( msgCount = 0 // Count how many message processed - expectedCount = 10 + expectedCount = 15 timeout = time.After(2 * time.Minute) ) @@ -532,6 +532,131 @@ func sendEventsToGateway(t *testing.T) { }`) sendEvent(t, payloadGroup, "group", writeKey) sendPixelEvents(t, writeKey) + + payloadRetlWebhook := strings.NewReader(`{ + "batch": + [ + { + "userId": "identified_user_id", + "anonymousId": "anonymousId_1", + "type": "identify", + "context": + { + "traits": + { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": + { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + } + ] + }`) + sendEvent(t, payloadRetlWebhook, "retl", writeKey, + withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"), + withHeader("X-Rudder-Destination-Id", "xxxyyyzzP9kQfzOoKd1tuxchYAG"), + withUrlPath("/internal/v1/retl")) + + payloadRetlKafka := strings.NewReader(`{ + "batch": + [ + { + "userId": "identified_user_id", + "anonymousId": "anonymousId_1", + "messageId":"messageId_11", + "type": "identify", + "context": + { + "traits": + { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": + { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + },{ + "userId": "identified_user_id", + "anonymousId": "anonymousId_1", + "type": "identify", + "context": + { + "traits": + { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": + { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + },{ + "userId": "identified_user_id", + "anonymousId": "anonymousId_1", + "type": "identify", + "context": + { + "traits": + { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": + { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + },{ + "userId": "identified_user_id", + "anonymousId": "anonymousId_1", + "type": "identify", + "context": + { + "traits": + { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": + { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + },{ + "userId": "identified_user_id", + "anonymousId": "anonymousId_1", + "type": "identify", + "context": + { + "traits": + { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": + { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + } + ] + }`) + sendEvent(t, payloadRetlKafka, "retl", writeKey, + withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"), + withHeader("X-Rudder-Destination-Id", "xxxyyyzzhyrw8v0CrTMrDZ4ovej"), + withUrlPath("/internal/v1/retl")) } func blockOnHold(t *testing.T) { @@ -597,7 +722,20 @@ func sendPixelEvents(t *testing.T, writeKey string) { } } -func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string) { +func withHeader(key, value string) func(r *http.Request) { + return func(req *http.Request) { + req.Header.Add(key, value) + } +} + +// withUrlPath will override the path of url in request +func withUrlPath(urlPath string) func(r *http.Request) { + return func(req *http.Request) { + req.URL.Path = urlPath + } +} + +func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string, reqOptions ...func(r *http.Request)) { t.Helper() t.Logf("Sending %s Event", callType) @@ -618,6 +756,10 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string) []byte(fmt.Sprintf("%s:", writeKey)), ))) + for _, reqOption := range reqOptions { + reqOption(req) + } + res, err := httpClient.Do(req) if err != nil { t.Logf("sendEvent error: %v", err) From 9c3b414c578ec7dce71d19ce4cac876717088940 Mon Sep 17 00:00:00 2001 From: mihirgandhi Date: Mon, 15 Jan 2024 18:11:11 +0530 Subject: [PATCH 5/5] addressed comments --- integration_test/docker_test/docker_test.go | 58 +++++++++++++-------- processor/processor.go | 22 ++++---- 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/integration_test/docker_test/docker_test.go b/integration_test/docker_test/docker_test.go index 5d5827993d..237c788722 100644 --- a/integration_test/docker_test/docker_test.go +++ b/integration_test/docker_test/docker_test.go @@ -556,10 +556,7 @@ func sendEventsToGateway(t *testing.T) { } ] }`) - sendEvent(t, payloadRetlWebhook, "retl", writeKey, - withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"), - withHeader("X-Rudder-Destination-Id", "xxxyyyzzP9kQfzOoKd1tuxchYAG"), - withUrlPath("/internal/v1/retl")) + sendRETL(t, payloadRetlWebhook, "xxxyyyzzEaEurW247ad9WYZLUyk", "xxxyyyzzP9kQfzOoKd1tuxchYAG") payloadRetlKafka := strings.NewReader(`{ "batch": @@ -653,10 +650,7 @@ func sendEventsToGateway(t *testing.T) { } ] }`) - sendEvent(t, payloadRetlKafka, "retl", writeKey, - withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"), - withHeader("X-Rudder-Destination-Id", "xxxyyyzzhyrw8v0CrTMrDZ4ovej"), - withUrlPath("/internal/v1/retl")) + sendRETL(t, payloadRetlKafka, "xxxyyyzzEaEurW247ad9WYZLUyk", "xxxyyyzzhyrw8v0CrTMrDZ4ovej") } func blockOnHold(t *testing.T) { @@ -722,20 +716,46 @@ func sendPixelEvents(t *testing.T, writeKey string) { } } -func withHeader(key, value string) func(r *http.Request) { - return func(req *http.Request) { - req.Header.Add(key, value) +func sendRETL(t *testing.T, payload *strings.Reader, sourceID, DestinationID string) { + t.Helper() + t.Logf("Sending rETL Event") + + var ( + httpClient = &http.Client{} + method = "POST" + url = fmt.Sprintf("http://localhost:%s/internal/v1/retl", httpPort) + ) + + req, err := http.NewRequest(method, url, payload) + if err != nil { + t.Logf("sendEvent error: %v", err) + return } -} -// withUrlPath will override the path of url in request -func withUrlPath(urlPath string) func(r *http.Request) { - return func(req *http.Request) { - req.URL.Path = urlPath + req.Header.Add("Content-Type", "application/json") + req.Header.Add("X-Rudder-Source-Id", sourceID) + req.Header.Add("X-Rudder-Destination-Id", DestinationID) + + res, err := httpClient.Do(req) + if err != nil { + t.Logf("sendEvent error: %v", err) + return + } + defer func() { httputil.CloseResponse(res) }() + + body, err := io.ReadAll(res.Body) + if err != nil { + t.Logf("sendEvent error: %v", err) + return + } + if res.Status != "200 OK" { + return } + + t.Logf("Event Sent Successfully: (%s)", body) } -func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string, reqOptions ...func(r *http.Request)) { +func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string) { t.Helper() t.Logf("Sending %s Event", callType) @@ -756,10 +776,6 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string, []byte(fmt.Sprintf("%s:", writeKey)), ))) - for _, reqOption := range reqOptions { - reqOption(req) - } - res, err := httpClient.Do(req) if err != nil { t.Logf("sendEvent error: %v", err) diff --git a/processor/processor.go b/processor/processor.go index 5b18898121..76a9bde9f3 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1544,8 +1544,8 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf outCountMap := make(map[string]int64) // destinations enabled destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail) - // map of messageID to destinationID: for messages that needs to be delivered to a specific destinations only - msgToDestMap := make(map[string]string) + // map of jobID to destinationID: for messages that needs to be delivered to a specific destinations only + jobIDToSpecificDestMapOnly := make(map[int64]string) spans := make([]stats.TraceSpan, 0, len(jobList)) defer func() { @@ -1564,6 +1564,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf sourceID := eventParams.SourceId traceParent := eventParams.TraceParent destinationID := eventParams.DestinationID + if destinationID != "" { + jobIDToSpecificDestMapOnly[batchEvent.JobID] = destinationID + } var span stats.TraceSpan if traceParent == "" { @@ -1621,9 +1624,6 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // Iterate through all the events in the batch for _, singularEvent := range gatewayBatchEvent.Batch { messageId := misc.GetStringifiedData(singularEvent["messageId"]) - if len(destinationID) != 0 { - msgToDestMap[messageId] = destinationID - } payloadFunc := ro.Memoize(func() json.RawMessage { payloadBytes, err := jsonfast.Marshal(singularEvent) @@ -1730,9 +1730,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf } // REPORTING - GATEWAY metrics - END - // Getting all the destinations which are enabled for this - // event will be dropped if no valid destination is present - // if empty string destinationID is all the destinations for the source is validated + // Getting all the destinations which are enabled for this event. + // Event will be dropped if no valid destination is present + // if empty destinationID is passed in this fn all the destinations for the source are validated // else only passed destinationID will be validated if !proc.isDestinationAvailable(singularEvent, sourceID, destinationID) { continue @@ -1903,11 +1903,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf enabledDestinationsList := proc.getConsentFilteredDestinations( singularEvent, lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool { - destId := msgToDestMap[event.Metadata.MessageID] - if len(destId) != 0 { + destId := jobIDToSpecificDestMapOnly[event.Metadata.JobID] + if destId != "" { return destId == item.ID } - return len(destId) == 0 + return destId == "" }), )