Skip to content

Commit 76b5f69

Browse files
committed
addressed comments
1 parent 49550b3 commit 76b5f69

File tree

2 files changed

+44
-28
lines changed

2 files changed

+44
-28
lines changed

integration_test/docker_test/docker_test.go

+37-21
Original file line numberDiff line numberDiff line change
@@ -556,10 +556,7 @@ func sendEventsToGateway(t *testing.T) {
556556
}
557557
]
558558
}`)
559-
sendEvent(t, payloadRetlWebhook, "retl", writeKey,
560-
withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"),
561-
withHeader("X-Rudder-Destination-Id", "xxxyyyzzP9kQfzOoKd1tuxchYAG"),
562-
withUrlPath("/internal/v1/retl"))
559+
sendRETL(t, payloadRetlWebhook, "xxxyyyzzEaEurW247ad9WYZLUyk", "xxxyyyzzP9kQfzOoKd1tuxchYAG")
563560

564561
payloadRetlKafka := strings.NewReader(`{
565562
"batch":
@@ -653,10 +650,7 @@ func sendEventsToGateway(t *testing.T) {
653650
}
654651
]
655652
}`)
656-
sendEvent(t, payloadRetlKafka, "retl", writeKey,
657-
withHeader("X-Rudder-Source-Id", "xxxyyyzzEaEurW247ad9WYZLUyk"),
658-
withHeader("X-Rudder-Destination-Id", "xxxyyyzzhyrw8v0CrTMrDZ4ovej"),
659-
withUrlPath("/internal/v1/retl"))
653+
sendRETL(t, payloadRetlKafka, "xxxyyyzzEaEurW247ad9WYZLUyk", "xxxyyyzzhyrw8v0CrTMrDZ4ovej")
660654
}
661655

662656
func blockOnHold(t *testing.T) {
@@ -722,20 +716,46 @@ func sendPixelEvents(t *testing.T, writeKey string) {
722716
}
723717
}
724718

725-
func withHeader(key, value string) func(r *http.Request) {
726-
return func(req *http.Request) {
727-
req.Header.Add(key, value)
719+
func sendRETL(t *testing.T, payload *strings.Reader, sourceID, DestinationID string) {
720+
t.Helper()
721+
t.Logf("Sending rETL Event")
722+
723+
var (
724+
httpClient = &http.Client{}
725+
method = "POST"
726+
url = fmt.Sprintf("http://localhost:%s/internal/v1/retl", httpPort)
727+
)
728+
729+
req, err := http.NewRequest(method, url, payload)
730+
if err != nil {
731+
t.Logf("sendEvent error: %v", err)
732+
return
728733
}
729-
}
730734

731-
// withUrlPath will override the path of url in request
732-
func withUrlPath(urlPath string) func(r *http.Request) {
733-
return func(req *http.Request) {
734-
req.URL.Path = urlPath
735+
req.Header.Add("Content-Type", "application/json")
736+
req.Header.Add("X-Rudder-Source-Id", sourceID)
737+
req.Header.Add("X-Rudder-Destination-Id", DestinationID)
738+
739+
res, err := httpClient.Do(req)
740+
if err != nil {
741+
t.Logf("sendEvent error: %v", err)
742+
return
743+
}
744+
defer func() { httputil.CloseResponse(res) }()
745+
746+
body, err := io.ReadAll(res.Body)
747+
if err != nil {
748+
t.Logf("sendEvent error: %v", err)
749+
return
750+
}
751+
if res.Status != "200 OK" {
752+
return
735753
}
754+
755+
t.Logf("Event Sent Successfully: (%s)", body)
736756
}
737757

738-
func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string, reqOptions ...func(r *http.Request)) {
758+
func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string) {
739759
t.Helper()
740760
t.Logf("Sending %s Event", callType)
741761

@@ -756,10 +776,6 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string,
756776
[]byte(fmt.Sprintf("%s:", writeKey)),
757777
)))
758778

759-
for _, reqOption := range reqOptions {
760-
reqOption(req)
761-
}
762-
763779
res, err := httpClient.Do(req)
764780
if err != nil {
765781
t.Logf("sendEvent error: %v", err)

processor/processor.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -1544,8 +1544,8 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
15441544

15451545
outCountMap := make(map[string]int64) // destinations enabled
15461546
destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail)
1547-
// map of messageID to destinationID: for messages that needs to be delivered to a specific destinations only
1548-
msgToDestMap := make(map[string]string)
1547+
// map of jobID to destinationID: for messages that needs to be delivered to a specific destinations only
1548+
jobIDToSpecificDestMapOnly := make(map[int64]string)
15491549

15501550
spans := make([]stats.TraceSpan, 0, len(jobList))
15511551
defer func() {
@@ -1621,8 +1621,8 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
16211621
// Iterate through all the events in the batch
16221622
for _, singularEvent := range gatewayBatchEvent.Batch {
16231623
messageId := misc.GetStringifiedData(singularEvent["messageId"])
1624-
if len(destinationID) != 0 {
1625-
msgToDestMap[messageId] = destinationID
1624+
if destinationID != "" {
1625+
jobIDToSpecificDestMapOnly[batchEvent.JobID] = destinationID
16261626
}
16271627

16281628
payloadFunc := ro.Memoize(func() json.RawMessage {
@@ -1903,11 +1903,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
19031903
enabledDestinationsList := proc.getConsentFilteredDestinations(
19041904
singularEvent,
19051905
lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool {
1906-
destId := msgToDestMap[event.Metadata.MessageID]
1907-
if len(destId) != 0 {
1906+
destId := jobIDToSpecificDestMapOnly[event.Metadata.JobID]
1907+
if destId != "" {
19081908
return destId == item.ID
19091909
}
1910-
return len(destId) == 0
1910+
return destId == ""
19111911
}),
19121912
)
19131913

0 commit comments

Comments
 (0)