Skip to content

Commit 32da9eb

Browse files
committed
feat: filter destinations for events from rETL in processor
1 parent 866393d commit 32da9eb

File tree

3 files changed

+53
-15
lines changed

3 files changed

+53
-15
lines changed

processor/processor.go

+23-6
Original file line numberDiff line numberDiff line change
@@ -1544,13 +1544,16 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
15441544

15451545
outCountMap := make(map[string]int64) // destinations enabled
15461546
destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail)
1547+
// map of messageID to destinationID: for messages that needs to be delivered to a specific destinations only
1548+
msgToDestMap := make(map[string]string)
15471549

15481550
spans := make([]stats.TraceSpan, 0, len(jobList))
15491551
defer func() {
15501552
for _, span := range spans {
15511553
span.End()
15521554
}
15531555
}()
1556+
15541557
for _, batchEvent := range jobList {
15551558
var eventParams types.EventParams
15561559
err := jsonfast.Unmarshal(batchEvent.Parameters, &eventParams)
@@ -1560,6 +1563,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
15601563

15611564
sourceID := eventParams.SourceId
15621565
traceParent := eventParams.TraceParent
1566+
destinationID := eventParams.DestinationID
15631567

15641568
var span stats.TraceSpan
15651569
if traceParent == "" {
@@ -1617,6 +1621,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
16171621
// Iterate through all the events in the batch
16181622
for _, singularEvent := range gatewayBatchEvent.Batch {
16191623
messageId := misc.GetStringifiedData(singularEvent["messageId"])
1624+
if len(destinationID) != 0 {
1625+
msgToDestMap[messageId] = destinationID
1626+
}
16201627

16211628
payloadFunc := ro.Memoize(func() json.RawMessage {
16221629
payloadBytes, err := jsonfast.Marshal(singularEvent)
@@ -1724,8 +1731,10 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
17241731
// REPORTING - GATEWAY metrics - END
17251732

17261733
// Getting all the destinations which are enabled for this
1727-
// event
1728-
if !proc.isDestinationAvailable(singularEvent, sourceID) {
1734+
// event will be dropped if no valid destination is present
1735+
// if empty string destinationID is all the destinations for the source is validated
1736+
// else only passed destinationID will be validated
1737+
if !proc.isDestinationAvailable(singularEvent, sourceId, destinationID) {
17291738
continue
17301739
}
17311740

@@ -1893,7 +1902,13 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
18931902
destType := &enabledDestTypes[i]
18941903
enabledDestinationsList := proc.getConsentFilteredDestinations(
18951904
singularEvent,
1896-
proc.getEnabledDestinations(sourceId, *destType),
1905+
lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool {
1906+
destId := msgToDestMap[event.Metadata.MessageID]
1907+
if len(destId) != 0 {
1908+
return destId == item.ID
1909+
}
1910+
return len(destId) == 0
1911+
}),
18971912
)
18981913

18991914
// Adding a singular event multiple times if there are multiple destinations of same type
@@ -3073,7 +3088,7 @@ func (*Handle) getLimiterPriority(partition string) kitsync.LimiterPriorityValue
30733088
// check if event has eligible destinations to send to
30743089
//
30753090
// event will be dropped if no destination is found
3076-
func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string) bool {
3091+
func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string, destinationID string) bool {
30773092
enabledDestTypes := integrations.FilterClientIntegrations(
30783093
event,
30793094
proc.getBackendEnabledDestinationTypes(sourceId),
@@ -3083,7 +3098,7 @@ func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId
30833098
return false
30843099
}
30853100

3086-
if enabledDestinationsList := proc.getConsentFilteredDestinations(
3101+
if enabledDestinationsList := lo.Filter(proc.getConsentFilteredDestinations(
30873102
event,
30883103
lo.Flatten(
30893104
lo.Map(
@@ -3093,7 +3108,9 @@ func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId
30933108
},
30943109
),
30953110
),
3096-
); len(enabledDestinationsList) == 0 {
3111+
), func(dest backendconfig.DestinationT, index int) bool {
3112+
return len(destinationID) == 0 || dest.ID == destinationID
3113+
}); len(enabledDestinationsList) == 0 {
30973114
proc.logger.Debug("No destination to route this event to")
30983115
return false
30993116
}

processor/processor_test.go

+29-9
Original file line numberDiff line numberDiff line change
@@ -2772,7 +2772,7 @@ var _ = Describe("Processor", Ordered, func() {
27722772
defer cancel()
27732773
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
27742774

2775-
Expect(processor.isDestinationAvailable(eventWithDeniedConsents, SourceIDOneTrustConsent)).To(BeTrue())
2775+
Expect(processor.isDestinationAvailable(eventWithDeniedConsents, SourceIDOneTrustConsent, "")).To(BeTrue())
27762776
Expect(
27772777
len(processor.getConsentFilteredDestinations(
27782778
eventWithDeniedConsents,
@@ -2783,7 +2783,7 @@ var _ = Describe("Processor", Ordered, func() {
27832783
)),
27842784
).To(Equal(3)) // all except D1 and D3
27852785

2786-
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsents, SourceIDOneTrustConsent)).To(BeTrue())
2786+
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsents, SourceIDOneTrustConsent, "")).To(BeTrue())
27872787
Expect(
27882788
len(processor.getConsentFilteredDestinations(
27892789
eventWithoutDeniedConsents,
@@ -2794,7 +2794,18 @@ var _ = Describe("Processor", Ordered, func() {
27942794
)),
27952795
).To(Equal(5)) // all
27962796

2797-
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent)).To(BeTrue())
2797+
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent, "")).To(BeTrue())
2798+
Expect(
2799+
len(processor.getConsentFilteredDestinations(
2800+
eventWithoutConsentManagementData,
2801+
processor.getEnabledDestinations(
2802+
SourceIDOneTrustConsent,
2803+
"destination-definition-name-enabled",
2804+
),
2805+
)),
2806+
).To(Equal(5)) // all
2807+
2808+
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDOneTrustConsent, "dest-id-1")).To(BeTrue())
27982809
Expect(
27992810
len(processor.getConsentFilteredDestinations(
28002811
eventWithoutConsentManagementData,
@@ -2853,7 +2864,7 @@ var _ = Describe("Processor", Ordered, func() {
28532864
),
28542865
)
28552866
Expect(len(filteredDestinations)).To(Equal(4)) // all except dest-id-5 since both purpose1 and purpose2 are denied
2856-
Expect(processor.isDestinationAvailable(event, SourceIDKetchConsent)).To(BeTrue())
2867+
Expect(processor.isDestinationAvailable(event, SourceIDKetchConsent, "")).To(BeTrue())
28572868
})
28582869

28592870
It("should filter based on generic consent management preferences", func() {
@@ -3010,7 +3021,7 @@ var _ = Describe("Processor", Ordered, func() {
30103021
defer cancel()
30113022
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
30123023

3013-
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDGCM)).To(BeTrue())
3024+
Expect(processor.isDestinationAvailable(eventWithoutConsentManagementData, SourceIDGCM, "")).To(BeTrue())
30143025
Expect(
30153026
len(processor.getConsentFilteredDestinations(
30163027
eventWithoutConsentManagementData,
@@ -3021,7 +3032,7 @@ var _ = Describe("Processor", Ordered, func() {
30213032
)),
30223033
).To(Equal(9)) // all
30233034

3024-
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsentsGCM, SourceIDGCM)).To(BeTrue())
3035+
Expect(processor.isDestinationAvailable(eventWithoutDeniedConsentsGCM, SourceIDGCM, "")).To(BeTrue())
30253036
Expect(
30263037
len(processor.getConsentFilteredDestinations(
30273038
eventWithoutDeniedConsentsGCM,
@@ -3032,7 +3043,7 @@ var _ = Describe("Processor", Ordered, func() {
30323043
)),
30333044
).To(Equal(9)) // all
30343045

3035-
Expect(processor.isDestinationAvailable(eventWithCustomConsentsGCM, SourceIDGCM)).To(BeTrue())
3046+
Expect(processor.isDestinationAvailable(eventWithCustomConsentsGCM, SourceIDGCM, "")).To(BeTrue())
30363047
Expect(
30373048
len(processor.getConsentFilteredDestinations(
30383049
eventWithCustomConsentsGCM,
@@ -3043,7 +3054,7 @@ var _ = Describe("Processor", Ordered, func() {
30433054
)),
30443055
).To(Equal(8)) // all except D13
30453056

3046-
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCM, SourceIDGCM)).To(BeTrue())
3057+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCM, SourceIDGCM, "")).To(BeTrue())
30473058
Expect(
30483059
len(processor.getConsentFilteredDestinations(
30493060
eventWithDeniedConsentsGCM,
@@ -3054,7 +3065,7 @@ var _ = Describe("Processor", Ordered, func() {
30543065
)),
30553066
).To(Equal(7)) // all except D6 and D7
30563067

3057-
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM)).To(BeTrue())
3068+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM, "")).To(BeTrue())
30583069
Expect(
30593070
len(processor.getConsentFilteredDestinations(
30603071
eventWithDeniedConsentsGCMKetch,
@@ -3064,6 +3075,15 @@ var _ = Describe("Processor", Ordered, func() {
30643075
),
30653076
)),
30663077
).To(Equal(8)) // all except D7
3078+
3079+
// some unknown destination ID is passed destination will be unavailable
3080+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDGCM, "unknown-destination")).To(BeFalse())
3081+
3082+
// known destination ID is passed and destination is enabled
3083+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDTransient, DestinationIDEnabledA)).To(BeTrue())
3084+
3085+
// know destination ID is passed and destination is not enabled
3086+
Expect(processor.isDestinationAvailable(eventWithDeniedConsentsGCMKetch, SourceIDTransient, DestinationIDDisabled)).To(BeFalse())
30673087
})
30683088
})
30693089

utils/types/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type EventParams struct {
3636
SourceId string `json:"source_id"`
3737
SourceTaskRunId string `json:"source_task_run_id"`
3838
TraceParent string `json:"traceparent"`
39+
DestinationID string `json:"destination_id"`
3940
}
4041

4142
// UserSuppression is interface to access Suppress user feature

0 commit comments

Comments
 (0)