Skip to content

Commit 3be17a4

Browse files
committed
feat: filter destinations for events from rETL in processor
1 parent 08c0864 commit 3be17a4

File tree

3 files changed

+53
-15
lines changed

3 files changed

+53
-15
lines changed

processor/processor.go

+23-6
Original file line numberDiff line numberDiff line change
@@ -1528,6 +1528,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
15281528
outCountMap := make(map[string]int64) // destinations enabled
15291529
destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail)
15301530

1531+
// map of messageID to destinationID: for messages that needs to be delivered to a specific destinations only
1532+
msgToDestMap := make(map[string]string)
1533+
15311534
for _, batchEvent := range jobList {
15321535

15331536
var gatewayBatchEvent types.GatewayBatchRequest
@@ -1542,6 +1545,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
15421545
panic(err)
15431546
}
15441547
sourceId := eventParams.SourceId
1548+
destinationID := eventParams.DestinationID
15451549
requestIP := gatewayBatchEvent.RequestIP
15461550
receivedAt := gatewayBatchEvent.ReceivedAt
15471551

@@ -1573,6 +1577,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
15731577
// Iterate through all the events in the batch
15741578
for _, singularEvent := range gatewayBatchEvent.Batch {
15751579
messageId := misc.GetStringifiedData(singularEvent["messageId"])
1580+
if len(destinationID) != 0 {
1581+
msgToDestMap[messageId] = destinationID
1582+
}
15761583

15771584
payloadFunc := ro.Memoize(func() json.RawMessage {
15781585
payloadBytes, err := jsonfast.Marshal(singularEvent)
@@ -1680,8 +1687,10 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
16801687
// REPORTING - GATEWAY metrics - END
16811688

16821689
// Getting all the destinations which are enabled for this
1683-
// event
1684-
if !proc.isDestinationAvailable(singularEvent, sourceId) {
1690+
// event will be dropped if no valid destination is present
1691+
// if empty string destinationID is all the destinations for the source is validated
1692+
// else only passed destinationID will be validated
1693+
if !proc.isDestinationAvailable(singularEvent, sourceId, destinationID) {
16851694
continue
16861695
}
16871696

@@ -1849,7 +1858,13 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
18491858
destType := &enabledDestTypes[i]
18501859
enabledDestinationsList := proc.getConsentFilteredDestinations(
18511860
singularEvent,
1852-
proc.getEnabledDestinations(sourceId, *destType),
1861+
lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool {
1862+
destId := msgToDestMap[event.Metadata.MessageID]
1863+
if len(destId) != 0 {
1864+
return destId == item.ID
1865+
}
1866+
return len(destId) == 0
1867+
}),
18531868
)
18541869

18551870
// Adding a singular event multiple times if there are multiple destinations of same type
@@ -2984,7 +2999,7 @@ func (*Handle) getLimiterPriority(partition string) kitsync.LimiterPriorityValue
29842999
// check if event has eligible destinations to send to
29853000
//
29863001
// event will be dropped if no destination is found
2987-
func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string) bool {
3002+
func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string, destinationID string) bool {
29883003
enabledDestTypes := integrations.FilterClientIntegrations(
29893004
event,
29903005
proc.getBackendEnabledDestinationTypes(sourceId),
@@ -2994,7 +3009,7 @@ func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId
29943009
return false
29953010
}
29963011

2997-
if enabledDestinationsList := proc.getConsentFilteredDestinations(
3012+
if enabledDestinationsList := lo.Filter(proc.getConsentFilteredDestinations(
29983013
event,
29993014
lo.Flatten(
30003015
lo.Map(
@@ -3004,7 +3019,9 @@ func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId
30043019
},
30053020
),
30063021
),
3007-
); len(enabledDestinationsList) == 0 {
3022+
), func(dest backendconfig.DestinationT, index int) bool {
3023+
return len(destinationID) == 0 || dest.ID == destinationID
3024+
}); len(enabledDestinationsList) == 0 {
30083025
proc.logger.Debug("No destination to route this event to")
30093026
return false
30103027
}

processor/processor_test.go

+29-9
Original file line numberDiff line numberDiff line change
@@ -2736,7 +2736,7 @@ var _ = Describe("Processor", Ordered, func() {
27362736
defer cancel()
27372737
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
27382738

2739-
Expect(processor.isDestinationAvailable(eventWithDeniedConsents, SourceIDOneTrustConsent)).To(BeTrue())
2739+
Expect(processor.isDestinationAvailable(eventWithDeniedConsents, SourceIDOneTrustConsent, "")).To(BeTrue())
27402740
Expect(
27412741
len(processor.getConsentFilteredDestinations(
27422742
eventWithDeniedConsents,
@@ -2747,7 +2747,7 @@ var _ = Describe("Processor", Ordered, func() {
27472747
)),
27482748
).To(Equal(3)) // all except D1 and D3
27492749

2750-
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsents, SourceIDOneTrustConsent)).To(BeTrue())
2750+
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsents, SourceIDOneTrustConsent, "")).To(BeTrue())
27512751
Expect(
27522752
len(processor.getConsentFilteredDestinations(
27532753
eventWithoutDeniedConsents,
@@ -2758,7 +2758,18 @@ var _ = Describe("Processor", Ordered, func() {
27582758
)),
27592759
).To(Equal(5)) // all
27602760

2761-
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent)).To(BeTrue())
2761+
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent, "")).To(BeTrue())
2762+
Expect(
2763+
len(processor.getConsentFilteredDestinations(
2764+
eventWithoutConsentManagementData,
2765+
processor.getEnabledDestinations(
2766+
SourceIDOneTrustConsent,
2767+
"destination-definition-name-enabled",
2768+
),
2769+
)),
2770+
).To(Equal(5)) // all
2771+
2772+
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent, "dest-id-1")).To(BeTrue())
27622773
Expect(
27632774
len(processor.getConsentFilteredDestinations(
27642775
eventWithoutConsentManagementData,
@@ -2817,7 +2828,7 @@ var _ = Describe("Processor", Ordered, func() {
28172828
),
28182829
)
28192830
Expect(len(filteredDestinations)).To(Equal(4)) // all except dest-id-5 since both purpose1 and purpose2 are denied
2820-
Expect(processor.isDestinationAvailable(event, SourceIDKetchConsent)).To(BeTrue())
2831+
Expect(processor.isDestinationAvailable(event, SourceIDKetchConsent, "")).To(BeTrue())
28212832
})
28222833

28232834
It("should filter based on generic consent management preferences", func() {
@@ -2974,7 +2985,7 @@ var _ = Describe("Processor", Ordered, func() {
29742985
defer cancel()
29752986
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
29762987

2977-
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDGCM)).To(BeTrue())
2988+
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDGCM, "")).To(BeTrue())
29782989
Expect(
29792990
len(processor.getConsentFilteredDestinations(
29802991
eventWithoutConsentManagementData,
@@ -2985,7 +2996,7 @@ var _ = Describe("Processor", Ordered, func() {
29852996
)),
29862997
).To(Equal(9)) // all
29872998

2988-
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsentsGCM, SourceIDGCM)).To(BeTrue())
2999+
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsentsGCM, SourceIDGCM, "")).To(BeTrue())
29893000
Expect(
29903001
len(processor.getConsentFilteredDestinations(
29913002
eventWithoutDeniedConsentsGCM,
@@ -2996,7 +3007,7 @@ var _ = Describe("Processor", Ordered, func() {
29963007
)),
29973008
).To(Equal(9)) // all
29983009

2999-
Expect(processor.isDestinationAvailable(eventWithCustomConsentsGCM, SourceIDGCM)).To(BeTrue())
3010+
Expect(processor.isDestinationAvailable(eventWithCustomConsentsGCM, SourceIDGCM, "")).To(BeTrue())
30003011
Expect(
30013012
len(processor.getConsentFilteredDestinations(
30023013
eventWithCustomConsentsGCM,
@@ -3007,7 +3018,7 @@ var _ = Describe("Processor", Ordered, func() {
30073018
)),
30083019
).To(Equal(8)) // all except D13
30093020

3010-
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCM, SourceIDGCM)).To(BeTrue())
3021+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCM, SourceIDGCM, "")).To(BeTrue())
30113022
Expect(
30123023
len(processor.getConsentFilteredDestinations(
30133024
eventWithDeniedConsentsGCM,
@@ -3018,7 +3029,7 @@ var _ = Describe("Processor", Ordered, func() {
30183029
)),
30193030
).To(Equal(7)) // all except D6 and D7
30203031

3021-
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM)).To(BeTrue())
3032+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM, "")).To(BeTrue())
30223033
Expect(
30233034
len(processor.getConsentFilteredDestinations(
30243035
eventWithDeniedConsentsGCMKetch,
@@ -3028,6 +3039,15 @@ var _ = Describe("Processor", Ordered, func() {
30283039
),
30293040
)),
30303041
).To(Equal(8)) // all except D7
3042+
3043+
// some unknown destination ID is passed destination will be unavailable
3044+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM, "unknown-destination")).To(BeFalse())
3045+
3046+
// known destination ID is passed and destination is enabled
3047+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDTransient, DestinationIDEnabledA)).To(BeTrue())
3048+
3049+
// know destination ID is passed and destination is not enabled
3050+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDTransient, DestinationIDDisabled)).To(BeFalse())
30313051
})
30323052
})
30333053

utils/types/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type EventParams struct {
3535
SourceJobRunId string `json:"source_job_run_id"`
3636
SourceId string `json:"source_id"`
3737
SourceTaskRunId string `json:"source_task_run_id"`
38+
DestinationID string `json:"destination_id"`
3839
}
3940

4041
// UserSuppression is interface to access Suppress user feature

0 commit comments

Comments
 (0)