Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter destinations for events from rETL in processor #4247

Merged
merged 5 commits into from
Jan 15, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
addressed comments
mihir20 committed Jan 15, 2024
commit 9c3b414c578ec7dce71d19ce4cac876717088940
58 changes: 37 additions & 21 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
@@ -556,10 +556,7 @@ func sendEventsToGateway(t *testing.T) {
}
]
}`)
sendEvent(t, payloadRetlWebhook, "retl", writeKey,
withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"),
withHeader("X-Rudder-Destination-Id", "xxxyyyzzP9kQfzOoKd1tuxchYAG"),
withUrlPath("/internal/v1/retl"))
sendRETL(t, payloadRetlWebhook, "xxxyyyzzEaEurW247ad9WYZLUyk", "xxxyyyzzP9kQfzOoKd1tuxchYAG")

payloadRetlKafka := strings.NewReader(`{
"batch":
@@ -653,10 +650,7 @@ func sendEventsToGateway(t *testing.T) {
}
]
}`)
sendEvent(t, payloadRetlKafka, "retl", writeKey,
withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"),
withHeader("X-Rudder-Destination-Id", "xxxyyyzzhyrw8v0CrTMrDZ4ovej"),
withUrlPath("/internal/v1/retl"))
sendRETL(t, payloadRetlKafka, "xxxyyyzzEaEurW247ad9WYZLUyk", "xxxyyyzzhyrw8v0CrTMrDZ4ovej")
}

func blockOnHold(t *testing.T) {
@@ -722,20 +716,46 @@ func sendPixelEvents(t *testing.T, writeKey string) {
}
}

func withHeader(key, value string) func(r *http.Request) {
return func(req *http.Request) {
req.Header.Add(key, value)
func sendRETL(t *testing.T, payload *strings.Reader, sourceID, DestinationID string) {
t.Helper()
t.Logf("Sending rETL Event")

var (
httpClient = &http.Client{}
method = "POST"
url = fmt.Sprintf("http://localhost:%s/internal/v1/retl", httpPort)
)

req, err := http.NewRequest(method, url, payload)
if err != nil {
t.Logf("sendEvent error: %v", err)
return
}
}

// withUrlPath will override the path of url in request
func withUrlPath(urlPath string) func(r *http.Request) {
return func(req *http.Request) {
req.URL.Path = urlPath
req.Header.Add("Content-Type", "application/json")
req.Header.Add("X-Rudder-Source-Id", sourceID)
req.Header.Add("X-Rudder-Destination-Id", DestinationID)

res, err := httpClient.Do(req)
if err != nil {
t.Logf("sendEvent error: %v", err)
return
}
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
if err != nil {
t.Logf("sendEvent error: %v", err)
return
}
if res.Status != "200 OK" {
return
}

t.Logf("Event Sent Successfully: (%s)", body)
}

func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string, reqOptions ...func(r *http.Request)) {
func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string) {
t.Helper()
t.Logf("Sending %s Event", callType)

@@ -756,10 +776,6 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string,
[]byte(fmt.Sprintf("%s:", writeKey)),
)))

for _, reqOption := range reqOptions {
reqOption(req)
}

res, err := httpClient.Do(req)
if err != nil {
t.Logf("sendEvent error: %v", err)
22 changes: 11 additions & 11 deletions processor/processor.go
Original file line number Diff line number Diff line change
@@ -1544,8 +1544,8 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf

outCountMap := make(map[string]int64) // destinations enabled
destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail)
// map of messageID to destinationID: for messages that needs to be delivered to a specific destinations only
msgToDestMap := make(map[string]string)
// map of jobID to destinationID: for messages that needs to be delivered to a specific destinations only
jobIDToSpecificDestMapOnly := make(map[int64]string)

spans := make([]stats.TraceSpan, 0, len(jobList))
defer func() {
@@ -1564,6 +1564,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
sourceID := eventParams.SourceId
traceParent := eventParams.TraceParent
destinationID := eventParams.DestinationID
if destinationID != "" {
jobIDToSpecificDestMapOnly[batchEvent.JobID] = destinationID
}

var span stats.TraceSpan
if traceParent == "" {
@@ -1621,9 +1624,6 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
// Iterate through all the events in the batch
for _, singularEvent := range gatewayBatchEvent.Batch {
messageId := misc.GetStringifiedData(singularEvent["messageId"])
if len(destinationID) != 0 {
msgToDestMap[messageId] = destinationID
}

payloadFunc := ro.Memoize(func() json.RawMessage {
payloadBytes, err := jsonfast.Marshal(singularEvent)
@@ -1730,9 +1730,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
}
// REPORTING - GATEWAY metrics - END

// Getting all the destinations which are enabled for this
// event will be dropped if no valid destination is present
// if empty string destinationID is all the destinations for the source is validated
// Getting all the destinations which are enabled for this event.
// Event will be dropped if no valid destination is present
// if empty destinationID is passed in this fn all the destinations for the source are validated
// else only passed destinationID will be validated
if !proc.isDestinationAvailable(singularEvent, sourceID, destinationID) {
continue
@@ -1903,11 +1903,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
enabledDestinationsList := proc.getConsentFilteredDestinations(
singularEvent,
lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool {
destId := msgToDestMap[event.Metadata.MessageID]
if len(destId) != 0 {
destId := jobIDToSpecificDestMapOnly[event.Metadata.JobID]
if destId != "" {
return destId == item.ID
}
return len(destId) == 0
return destId == ""
}),
)