@@ -98,7 +98,7 @@ func TestMainFlow(t *testing.T) {
98
98
sendEventsToGateway (t )
99
99
t .Run ("webhook" , func (t * testing.T ) {
100
100
require .Eventually (t , func () bool {
101
- return webhook .RequestsCount () == 10
101
+ return webhook .RequestsCount () == 11
102
102
}, time .Minute , 300 * time .Millisecond )
103
103
104
104
i := - 1
@@ -224,7 +224,7 @@ func TestMainFlow(t *testing.T) {
224
224
225
225
var (
226
226
msgCount = 0 // Count how many message processed
227
- expectedCount = 10
227
+ expectedCount = 15
228
228
timeout = time .After (2 * time .Minute )
229
229
)
230
230
@@ -532,6 +532,131 @@ func sendEventsToGateway(t *testing.T) {
532
532
}` )
533
533
sendEvent (t , payloadGroup , "group" , writeKey )
534
534
sendPixelEvents (t , writeKey )
535
+
536
+ payloadRetlWebhook := strings .NewReader (`{
537
+ "batch":
538
+ [
539
+ {
540
+ "userId": "identified_user_id",
541
+ "anonymousId": "anonymousId_1",
542
+ "type": "identify",
543
+ "context":
544
+ {
545
+ "traits":
546
+ {
547
+ "trait1": "new-val"
548
+ },
549
+ "ip": "14.5.67.21",
550
+ "library":
551
+ {
552
+ "name": "http"
553
+ }
554
+ },
555
+ "timestamp": "2020-02-02T00:23:09.544Z"
556
+ }
557
+ ]
558
+ }` )
559
+ sendEvent (t , payloadRetlWebhook , "retl" , writeKey ,
560
+ withHeader ("X-Rudder-Source-Id" , "xxxyyyzzEaEurW247ad9WYZLUyk" ),
561
+ withHeader ("X-Rudder-Destination-Id" , "xxxyyyzzP9kQfzOoKd1tuxchYAG" ),
562
+ withUrlPath ("/internal/v1/retl" ))
563
+
564
+ payloadRetlKafka := strings .NewReader (`{
565
+ "batch":
566
+ [
567
+ {
568
+ "userId": "identified_user_id",
569
+ "anonymousId": "anonymousId_1",
570
+ "messageId":"messageId_11",
571
+ "type": "identify",
572
+ "context":
573
+ {
574
+ "traits":
575
+ {
576
+ "trait1": "new-val"
577
+ },
578
+ "ip": "14.5.67.21",
579
+ "library":
580
+ {
581
+ "name": "http"
582
+ }
583
+ },
584
+ "timestamp": "2020-02-02T00:23:09.544Z"
585
+ },{
586
+ "userId": "identified_user_id",
587
+ "anonymousId": "anonymousId_1",
588
+ "type": "identify",
589
+ "context":
590
+ {
591
+ "traits":
592
+ {
593
+ "trait1": "new-val"
594
+ },
595
+ "ip": "14.5.67.21",
596
+ "library":
597
+ {
598
+ "name": "http"
599
+ }
600
+ },
601
+ "timestamp": "2020-02-02T00:23:09.544Z"
602
+ },{
603
+ "userId": "identified_user_id",
604
+ "anonymousId": "anonymousId_1",
605
+ "type": "identify",
606
+ "context":
607
+ {
608
+ "traits":
609
+ {
610
+ "trait1": "new-val"
611
+ },
612
+ "ip": "14.5.67.21",
613
+ "library":
614
+ {
615
+ "name": "http"
616
+ }
617
+ },
618
+ "timestamp": "2020-02-02T00:23:09.544Z"
619
+ },{
620
+ "userId": "identified_user_id",
621
+ "anonymousId": "anonymousId_1",
622
+ "type": "identify",
623
+ "context":
624
+ {
625
+ "traits":
626
+ {
627
+ "trait1": "new-val"
628
+ },
629
+ "ip": "14.5.67.21",
630
+ "library":
631
+ {
632
+ "name": "http"
633
+ }
634
+ },
635
+ "timestamp": "2020-02-02T00:23:09.544Z"
636
+ },{
637
+ "userId": "identified_user_id",
638
+ "anonymousId": "anonymousId_1",
639
+ "type": "identify",
640
+ "context":
641
+ {
642
+ "traits":
643
+ {
644
+ "trait1": "new-val"
645
+ },
646
+ "ip": "14.5.67.21",
647
+ "library":
648
+ {
649
+ "name": "http"
650
+ }
651
+ },
652
+ "timestamp": "2020-02-02T00:23:09.544Z"
653
+ }
654
+ ]
655
+ }` )
656
+ sendEvent (t , payloadRetlKafka , "retl" , writeKey ,
657
+ withHeader ("X-Rudder-Source-Id" , "xxxyyyzzEaEurW247ad9WYZLUyk" ),
658
+ withHeader ("X-Rudder-Destination-Id" , "xxxyyyzzhyrw8v0CrTMrDZ4ovej" ),
659
+ withUrlPath ("/internal/v1/retl" ))
535
660
}
536
661
537
662
func blockOnHold (t * testing.T ) {
@@ -597,7 +722,20 @@ func sendPixelEvents(t *testing.T, writeKey string) {
597
722
}
598
723
}
599
724
600
- func sendEvent (t * testing.T , payload * strings.Reader , callType , writeKey string ) {
725
+ func withHeader (key , value string ) func (r * http.Request ) {
726
+ return func (req * http.Request ) {
727
+ req .Header .Add (key , value )
728
+ }
729
+ }
730
+
731
+ // withUrlPath will override the path of url in request
732
+ func withUrlPath (urlPath string ) func (r * http.Request ) {
733
+ return func (req * http.Request ) {
734
+ req .URL .Path = urlPath
735
+ }
736
+ }
737
+
738
+ func sendEvent (t * testing.T , payload * strings.Reader , callType , writeKey string , reqOptions ... func (r * http.Request )) {
601
739
t .Helper ()
602
740
t .Logf ("Sending %s Event" , callType )
603
741
@@ -618,6 +756,10 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string)
618
756
[]byte (fmt .Sprintf ("%s:" , writeKey )),
619
757
)))
620
758
759
+ for _ , reqOption := range reqOptions {
760
+ reqOption (req )
761
+ }
762
+
621
763
res , err := httpClient .Do (req )
622
764
if err != nil {
623
765
t .Logf ("sendEvent error: %v" , err )
0 commit comments