Skip to content

Commit c41ae57

Browse files
authored
chore: remove unused code (#4579)
1 parent ac14cdb commit c41ae57

File tree

8 files changed

+20
-179
lines changed

8 files changed

+20
-179
lines changed

router/batchrouter/handle_async.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM
556556

557557
switch status.JobState {
558558
case jobsdb.Failed.State:
559-
if status.ErrorCode != strconv.Itoa(types.RouterTimedOutStatusCode) && status.ErrorCode != strconv.Itoa(types.RouterUnMarshalErrorCode) {
559+
if status.ErrorCode != strconv.Itoa(types.RouterUnMarshalErrorCode) {
560560
if status.AttemptNum == 1 {
561561
sd.Count++
562562
}
@@ -661,8 +661,6 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
661661
}
662662
if attempted {
663663
status.AttemptNum = attemptNums[jobId] + 1
664-
} else {
665-
status.ErrorCode = strconv.Itoa(types.RouterTimedOutStatusCode)
666664
}
667665

668666
if brt.retryLimitReached(&status) {

router/handle.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
367367

368368
switch workerJobStatus.status.JobState {
369369
case jobsdb.Failed.State:
370-
if workerJobStatus.status.ErrorCode != strconv.Itoa(types.RouterTimedOutStatusCode) && workerJobStatus.status.ErrorCode != strconv.Itoa(types.RouterUnMarshalErrorCode) {
370+
if workerJobStatus.status.ErrorCode != strconv.Itoa(types.RouterUnMarshalErrorCode) {
371371
if workerJobStatus.status.AttemptNum == 1 {
372372
sd.Count++
373373
}
@@ -644,8 +644,7 @@ func (rt *Handle) drainOrRetryLimitReached(job *jobsdb.JobT) (bool, string) {
644644
func (rt *Handle) retryLimitReached(status *jobsdb.JobStatusT) bool {
645645
respStatusCode, _ := strconv.Atoi(status.ErrorCode)
646646
switch respStatusCode {
647-
case types.RouterTimedOutStatusCode,
648-
types.RouterUnMarshalErrorCode: // 5xx errors
647+
case types.RouterUnMarshalErrorCode: // 5xx errors
649648
return false
650649
}
651650

router/handle_lifecycle.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010

1111
"github.com/rudderlabs/rudder-go-kit/bytesize"
1212

13-
"github.com/samber/lo"
1413
"golang.org/x/sync/errgroup"
1514

15+
"github.com/samber/lo"
16+
1617
"github.com/rudderlabs/rudder-go-kit/config"
1718
"github.com/rudderlabs/rudder-go-kit/logger"
1819
"github.com/rudderlabs/rudder-go-kit/stats"
@@ -291,7 +292,6 @@ func (rt *Handle) setupReloadableVars() {
291292
rt.reloadableConfig.maxFailedCountForJob = config.GetReloadableIntVar(3, 1, "Router."+rt.destType+".maxFailedCountForJob", "Router.maxFailedCountForJob")
292293
rt.reloadableConfig.maxFailedCountForSourcesJob = config.GetReloadableIntVar(3, 1, "Router.RSources"+rt.destType+".maxFailedCountForJob", "Router.RSources.maxFailedCountForJob")
293294
rt.reloadableConfig.payloadLimit = config.GetReloadableInt64Var(100*bytesize.MB, 1, "Router."+rt.destType+".PayloadLimit", "Router.PayloadLimit")
294-
rt.reloadableConfig.routerTimeout = config.GetReloadableDurationVar(3600, time.Second, "Router."+rt.destType+".routerTimeout", "Router.routerTimeout")
295295
rt.reloadableConfig.retryTimeWindow = config.GetReloadableDurationVar(180, time.Minute, "Router."+rt.destType+".retryTimeWindow", "Router.retryTimeWindow")
296296
rt.reloadableConfig.sourcesRetryTimeWindow = config.GetReloadableDurationVar(1, time.Minute, "Router.RSources"+rt.destType+".retryTimeWindow", "Router.RSources.retryTimeWindow")
297297
rt.reloadableConfig.maxDSQuerySize = config.GetReloadableIntVar(10, 1, "Router."+rt.destType+".maxDSQuery", "Router.maxDSQuery")

router/network.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type netHandle struct {
3434
logger logger.Logger
3535
}
3636

37-
// Network interface
37+
// NetHandle interface
3838
type NetHandle interface {
3939
SendPost(ctx context.Context, structData integrations.PostParametersT) *utils.SendPostResponse
4040
}
@@ -137,7 +137,7 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
137137
}
138138
var buf bytes.Buffer
139139
zw := gzip.NewWriter(&buf)
140-
defer zw.Close()
140+
defer func() { _ = zw.Close() }()
141141

142142
if _, err := zw.Write([]byte(strValue)); err != nil {
143143
return &utils.SendPostResponse{

router/router_test.go

+10-149
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"math"
87
"slices"
98
"sync/atomic"
109
"testing"
@@ -159,7 +158,7 @@ type drainer struct {
159158
reason string
160159
}
161160

162-
func (d *drainer) Drain(job *jobsdb.JobT) (bool, string) {
161+
func (d *drainer) Drain(_ *jobsdb.JobT) (bool, string) {
163162
return d.drain, d.reason
164163
}
165164

@@ -176,15 +175,15 @@ func (m *mockThrottlerFactory) Shutdown() {}
176175

177176
func TestBackoff(t *testing.T) {
178177
t.Run("nextAttemptAfter", func(t *testing.T) {
179-
min := 10 * time.Second
180-
max := 300 * time.Second
181-
require.Equal(t, 10*time.Second, nextAttemptAfter(0, min, max))
182-
require.Equal(t, 10*time.Second, nextAttemptAfter(1, min, max))
183-
require.Equal(t, 20*time.Second, nextAttemptAfter(2, min, max))
184-
require.Equal(t, 40*time.Second, nextAttemptAfter(3, min, max))
185-
require.Equal(t, 80*time.Second, nextAttemptAfter(4, min, max))
186-
require.Equal(t, 160*time.Second, nextAttemptAfter(5, min, max))
187-
require.Equal(t, 300*time.Second, nextAttemptAfter(6, min, max))
178+
minBackoff := 10 * time.Second
179+
maxBackoff := 300 * time.Second
180+
require.Equal(t, 10*time.Second, nextAttemptAfter(0, minBackoff, maxBackoff))
181+
require.Equal(t, 10*time.Second, nextAttemptAfter(1, minBackoff, maxBackoff))
182+
require.Equal(t, 20*time.Second, nextAttemptAfter(2, minBackoff, maxBackoff))
183+
require.Equal(t, 40*time.Second, nextAttemptAfter(3, minBackoff, maxBackoff))
184+
require.Equal(t, 80*time.Second, nextAttemptAfter(4, minBackoff, maxBackoff))
185+
require.Equal(t, 160*time.Second, nextAttemptAfter(5, minBackoff, maxBackoff))
186+
require.Equal(t, 300*time.Second, nextAttemptAfter(6, minBackoff, maxBackoff))
188187
})
189188

190189
t.Run("findWorker", func(t *testing.T) {
@@ -1058,138 +1057,6 @@ var _ = Describe("router", func() {
10581057
}, 60*time.Second, 10*time.Millisecond).
10591058
Should(Equal(true), fmt.Sprintf("Router should both abort (actual: %t) and store to proc error (actual: %t)", routerAborted, procErrorStored))
10601059
})
1061-
1062-
It("can fail jobs if time is more than router timeout", func() {
1063-
mockNetHandle := mocksRouter.NewMockNetHandle(c.mockCtrl)
1064-
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
1065-
router := &Handle{
1066-
Reporting: &reporting.NOOP{},
1067-
netHandle: mockNetHandle,
1068-
}
1069-
c.mockBackendConfig.EXPECT().AccessToken().AnyTimes()
1070-
router.Setup(gaDestinationDefinition, logger.NOP, conf, c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, transientsource.NewEmptyService(), rsources.NewNoOpService(), transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), throttler.NewNoOpThrottlerFactory())
1071-
router.transformer = mockTransformer
1072-
router.noOfWorkers = 1
1073-
router.reloadableConfig.noOfJobsToBatchInAWorker = config.SingleValueLoader(5)
1074-
router.reloadableConfig.routerTimeout = config.SingleValueLoader(time.Duration(0))
1075-
1076-
gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}`
1077-
parameters := fmt.Sprintf(`{"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "processor"}`, gaDestinationID) // skipcq: GO-R4002
1078-
1079-
toRetryJobsList := []*jobsdb.JobT{
1080-
{
1081-
UUID: uuid.New(),
1082-
UserID: "u1",
1083-
JobID: 2009,
1084-
CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1085-
ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1086-
CustomVal: customVal["GA"],
1087-
EventPayload: []byte(gaPayload),
1088-
LastJobStatus: jobsdb.JobStatusT{
1089-
AttemptNum: 1,
1090-
ErrorResponse: []byte(`{"firstAttemptedAt": "2021-06-28T15:57:30.742+05:30"}`),
1091-
},
1092-
Parameters: []byte(parameters),
1093-
},
1094-
}
1095-
1096-
unprocessedJobsList := []*jobsdb.JobT{
1097-
{
1098-
UUID: uuid.New(),
1099-
UserID: "u1",
1100-
JobID: 2010,
1101-
CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1102-
ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1103-
CustomVal: customVal["GA"],
1104-
EventPayload: []byte(gaPayload),
1105-
LastJobStatus: jobsdb.JobStatusT{
1106-
AttemptNum: 0,
1107-
},
1108-
Parameters: []byte(parameters),
1109-
},
1110-
{
1111-
UUID: uuid.New(),
1112-
UserID: "u2",
1113-
JobID: 2011,
1114-
CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1115-
ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1116-
CustomVal: customVal["GA"],
1117-
EventPayload: []byte(gaPayload),
1118-
LastJobStatus: jobsdb.JobStatusT{
1119-
AttemptNum: 0,
1120-
},
1121-
Parameters: []byte(parameters),
1122-
},
1123-
{
1124-
UUID: uuid.New(),
1125-
UserID: "u2",
1126-
JobID: 2012,
1127-
CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1128-
ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1129-
CustomVal: customVal["GA"],
1130-
EventPayload: []byte(gaPayload),
1131-
LastJobStatus: jobsdb.JobStatusT{
1132-
AttemptNum: 0,
1133-
},
1134-
Parameters: []byte(parameters),
1135-
},
1136-
{
1137-
UUID: uuid.New(),
1138-
UserID: "u3",
1139-
JobID: 2013,
1140-
CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1141-
ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
1142-
CustomVal: customVal["GA"],
1143-
EventPayload: []byte(gaPayload),
1144-
LastJobStatus: jobsdb.JobStatusT{
1145-
AttemptNum: 0,
1146-
},
1147-
Parameters: []byte(parameters),
1148-
},
1149-
}
1150-
1151-
allJobs := append(toRetryJobsList, unprocessedJobsList...)
1152-
1153-
payloadLimit := router.reloadableConfig.payloadLimit
1154-
callAllJobs := c.mockRouterJobsDB.EXPECT().GetToProcess(gomock.Any(),
1155-
jobsdb.GetQueryParams{
1156-
CustomValFilters: []string{customVal["GA"]},
1157-
ParameterFilters: []jobsdb.ParameterFilterT{{Name: "destination_id", Value: gaDestinationID}},
1158-
PayloadSizeLimit: payloadLimit.Load(),
1159-
JobsLimit: 10000,
1160-
}, nil).Times(1).Return(&jobsdb.MoreJobsResult{JobsResult: jobsdb.JobsResult{Jobs: allJobs}}, nil)
1161-
1162-
c.mockRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1).
1163-
Do(func(ctx context.Context, statuses []*jobsdb.JobStatusT, _, _ interface{}) {
1164-
assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Executing.State, "", `{}`, 1)
1165-
assertJobStatus(unprocessedJobsList[0], statuses[1], jobsdb.Executing.State, "", `{}`, 0)
1166-
assertJobStatus(unprocessedJobsList[1], statuses[2], jobsdb.Executing.State, "", `{}`, 0)
1167-
assertJobStatus(unprocessedJobsList[2], statuses[3], jobsdb.Executing.State, "", `{}`, 0)
1168-
assertJobStatus(unprocessedJobsList[3], statuses[4], jobsdb.Executing.State, "", `{}`, 0)
1169-
}).Return(nil).After(callAllJobs)
1170-
mockNetHandle.EXPECT().SendPost(gomock.Any(), gomock.Any()).Times(0)
1171-
done := make(chan struct{})
1172-
c.mockRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
1173-
_ = f(jobsdb.EmptyUpdateSafeTx())
1174-
close(done)
1175-
}).Return(nil)
1176-
c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1)
1177-
1178-
<-router.backendConfigInitialized
1179-
worker := newPartitionWorker(context.Background(), router, gaDestinationID)
1180-
defer worker.Stop()
1181-
Expect(worker.Work()).To(BeTrue())
1182-
Expect(worker.pickupCount).To(Equal(5))
1183-
Eventually(func() bool {
1184-
select {
1185-
case <-done:
1186-
return true
1187-
default:
1188-
return false
1189-
}
1190-
}, 20*time.Second, 100*time.Millisecond).Should(Equal(true))
1191-
})
1192-
11931060
It("aborts jobs if destination is not found in config", func() {
11941061
mockNetHandle := mocksRouter.NewMockNetHandle(c.mockCtrl)
11951062
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
@@ -1202,7 +1069,6 @@ var _ = Describe("router", func() {
12021069
router.transformer = mockTransformer
12031070
router.noOfWorkers = 1
12041071
router.reloadableConfig.noOfJobsToBatchInAWorker = config.SingleValueLoader(5)
1205-
router.reloadableConfig.routerTimeout = config.SingleValueLoader(60 * time.Second)
12061072

12071073
gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}`
12081074
parameters := fmt.Sprintf(`{"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "processor"}`, nonexistentDestinationID) // skipcq: GO-R4002
@@ -1304,7 +1170,6 @@ var _ = Describe("router", func() {
13041170
router.enableBatching = true
13051171
router.reloadableConfig.noOfJobsToBatchInAWorker = config.SingleValueLoader(3)
13061172
router.noOfWorkers = 1
1307-
router.reloadableConfig.routerTimeout = config.SingleValueLoader(time.Duration(math.MaxInt64))
13081173

13091174
gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}`
13101175
parameters := fmt.Sprintf(`{"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "processor"}`, gaDestinationID) // skipcq: GO-R4002
@@ -1623,7 +1488,6 @@ var _ = Describe("router", func() {
16231488
router.transformer = mockTransformer
16241489
router.noOfWorkers = 1
16251490
router.reloadableConfig.noOfJobsToBatchInAWorker = config.SingleValueLoader(5)
1626-
router.reloadableConfig.routerTimeout = config.SingleValueLoader(time.Duration(math.MaxInt64))
16271491

16281492
gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}`
16291493
parameters := fmt.Sprintf(`{"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "router"}`, gaDestinationID) // skipcq: GO-R4002
@@ -1833,7 +1697,6 @@ var _ = Describe("router", func() {
18331697
router.transformer = mockTransformer
18341698
router.noOfWorkers = 1
18351699
router.reloadableConfig.noOfJobsToBatchInAWorker = config.SingleValueLoader(3)
1836-
router.reloadableConfig.routerTimeout = config.SingleValueLoader(time.Duration(math.MaxInt64))
18371700

18381701
gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}`
18391702
parameters := fmt.Sprintf(`{"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "router"}`, gaDestinationID) // skipcq: GO-R4002
@@ -2161,7 +2024,6 @@ var _ = Describe("router", func() {
21612024
router.transformer = mockTransformer
21622025

21632026
router.reloadableConfig.noOfJobsToBatchInAWorker = config.SingleValueLoader(3)
2164-
router.reloadableConfig.routerTimeout = config.SingleValueLoader(time.Duration(math.MaxInt64))
21652027
router.reloadableConfig.transformerProxy = config.SingleValueLoader(true)
21662028
router.noOfWorkers = 1
21672029

@@ -2308,7 +2170,6 @@ var _ = Describe("router", func() {
23082170
router.transformer = mockTransformer
23092171

23102172
router.reloadableConfig.noOfJobsToBatchInAWorker = config.SingleValueLoader(3)
2311-
router.reloadableConfig.routerTimeout = config.SingleValueLoader(time.Duration(math.MaxInt64))
23122173
router.reloadableConfig.transformerProxy = config.SingleValueLoader(true)
23132174
router.noOfWorkers = 1
23142175

router/types.go

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ type reloadableConfig struct {
7070
maxFailedCountForJob config.ValueLoader[int]
7171
maxFailedCountForSourcesJob config.ValueLoader[int]
7272
payloadLimit config.ValueLoader[int64]
73-
routerTimeout config.ValueLoader[time.Duration]
7473
retryTimeWindow config.ValueLoader[time.Duration]
7574
sourcesRetryTimeWindow config.ValueLoader[time.Duration]
7675
pickupFlushInterval config.ValueLoader[time.Duration]

router/types/types.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ import (
1111
"github.com/rudderlabs/rudder-server/jobsdb"
1212
)
1313

14-
const (
15-
RouterUnMarshalErrorCode = 599
16-
RouterTimedOutStatusCode = 1113
17-
)
14+
const RouterUnMarshalErrorCode = 599
1815

1916
// RouterJobT holds the router job and its related metadata
2017
type RouterJobT struct {

router/worker.go

+2-15
Original file line numberDiff line numberDiff line change
@@ -470,20 +470,7 @@ func (w *worker) processDestinationJobs() {
470470
// router_delivery_exceeded_timeout -> goes to zero
471471
ch := w.trackStuckDelivery()
472472

473-
// Assuming twice the overhead - defensive: 30% was just fine though
474-
// In fact, the timeout should be more than the maximum latency allowed by these workers.
475-
// Assuming 10s maximum latency
476-
elapsed := time.Since(w.processingStartTime)
477-
threshold := w.rt.reloadableConfig.routerTimeout.Load()
478-
if elapsed > threshold {
479-
respStatusCode := types.RouterTimedOutStatusCode
480-
respBody := fmt.Sprintf("Failed with status code %d as the jobs took more time than expected. Will be retried", types.RouterTimedOutStatusCode)
481-
respStatusCodes, respBodys = w.prepareResponsesForJobs(&destinationJob, respStatusCode, respBody)
482-
w.logger.Debugf(
483-
"Will drop with %d because of time expiry %v",
484-
types.RouterTimedOutStatusCode, destinationJob.JobMetadataArray[0].JobID,
485-
)
486-
} else if w.rt.customDestinationManager != nil {
473+
if w.rt.customDestinationManager != nil {
487474
for _, destinationJobMetadata := range destinationJob.JobMetadataArray {
488475
if destinationID != destinationJobMetadata.DestinationID {
489476
panic(fmt.Errorf("different destinations are grouped together"))
@@ -1094,7 +1081,7 @@ func (w *worker) sendEventDeliveryStat(destinationJobMetadata *types.JobMetadata
10941081

10951082
func (w *worker) sendDestinationResponseToConfigBackend(payload json.RawMessage, destinationJobMetadata *types.JobMetadataT, status *jobsdb.JobStatusT, sourceIDs []string) {
10961083
// Sending destination response to config backend
1097-
if status.ErrorCode != fmt.Sprint(types.RouterUnMarshalErrorCode) && status.ErrorCode != fmt.Sprint(types.RouterTimedOutStatusCode) {
1084+
if status.ErrorCode != fmt.Sprint(types.RouterUnMarshalErrorCode) {
10981085
deliveryStatus := destinationdebugger.DeliveryStatusT{
10991086
DestinationID: destinationJobMetadata.DestinationID,
11001087
SourceID: strings.Join(sourceIDs, ","),

0 commit comments

Comments
 (0)