Skip to content

Commit 52d362d

Browse files
authored
feat: add ready queue concurrent processing (#39)
1 parent 68075c0 commit 52d362d

File tree

5 files changed

+114
-58
lines changed

5 files changed

+114
-58
lines changed

README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ Inhooks aims to be a lightweight incoming webhooks gateway solution. Written in
99

1010

1111
## Features
12-
- Receive HTTP Webhooks and Enqueue to redis
12+
- Receive HTTP Webhooks and save to queue
1313
- Fanout messages to multiple HTTP targets
14-
- Delayed processing
15-
- Retries on failure with configurable maximum attempts count, interval, with constant or exponential backoff
14+
- Fast, concurrent processing
15+
- Supports delayed processing
16+
- Supports retries on failure with configurable maximum attempts count, interval, with constant or exponential backoff
1617
- ... more features coming
1718

1819
## Usage

pkg/lib/config.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ type RedisConfig struct {
3030

3131
// Supervisor queues handling settings
3232
type SupervisorConfig struct {
33-
ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"`
34-
ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"`
33+
ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"`
34+
ReadyQueueConcurrency int `env:"SUPERVISOR_READY_QUEUE_CONCURRENCY,default=5"`
35+
ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"`
3536
// interval between scheduler runs to move scheduled jobs to "ready for processing" queue
3637
SchedulerInterval time.Duration `env:"SUPERVISOR_SCHEDULER_INTERVAL,default=30s"`
3738
// interval to move back stuck messages from processing to ready queue

pkg/supervisor/processing_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ func TestSupervisorHandleProcessingQueue(t *testing.T) {
5050
DoAndReturn(func(ctx context.Context, flow *models.Flow, sink *models.Sink, processingRecoveryInterval time.Duration) ([]string, error) {
5151
s.Shutdown()
5252
return movedMessageIds, nil
53-
}).
54-
Return(movedMessageIds, nil)
53+
})
5554

5655
s.HandleProcessingQueue(flow1, sink1)
5756
}

pkg/supervisor/ready.go

+45-35
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,17 @@ import (
1212
func (s *Supervisor) HandleReadyQueue(f *models.Flow, sink *models.Sink) {
1313
logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID))
1414

15+
mChan := make(chan *models.Message, s.appConf.Supervisor.ReadyQueueConcurrency)
16+
17+
for i := 0; i < s.appConf.Supervisor.ReadyQueueConcurrency; i++ {
18+
go s.startReadyProcessor(s.ctx, f, sink, mChan)
19+
}
20+
1521
for {
16-
err := s.FetchAndProcess(s.ctx, f, sink)
22+
m, err := s.messageFetcher.GetMessageForProcessing(s.ctx, s.appConf.Supervisor.ReadyWaitTime, f.ID, sink.ID)
1723
if err != nil && !errors.Is(err, context.Canceled) {
18-
logger.Error("failed to fetch and processed", zap.Error(err))
24+
logger.Error("failed to fetch message for processing", zap.Error(err))
25+
1926
// wait before retrying
2027
timer := time.NewTimer(s.appConf.Supervisor.ErrSleepTime)
2128

@@ -27,6 +34,10 @@ func (s *Supervisor) HandleReadyQueue(f *models.Flow, sink *models.Sink) {
2734
}
2835
}
2936

37+
if m != nil {
38+
mChan <- m
39+
}
40+
3041
// check if channel closed
3142
select {
3243
case <-s.ctx.Done():
@@ -37,41 +48,40 @@ func (s *Supervisor) HandleReadyQueue(f *models.Flow, sink *models.Sink) {
3748
}
3849
}
3950

40-
func (s *Supervisor) FetchAndProcess(ctx context.Context, f *models.Flow, sink *models.Sink) error {
41-
m, err := s.messageFetcher.GetMessageForProcessing(ctx, s.appConf.Supervisor.ReadyWaitTime, f.ID, sink.ID)
42-
if err != nil {
43-
return errors.Wrapf(err, "failed to get message for processing")
44-
}
45-
if m == nil {
46-
// no messages
47-
return nil
48-
}
49-
50-
logger := s.logger.With(
51-
zap.String("flowID", f.ID),
52-
zap.String("sinkID", sink.ID),
53-
zap.String("sinkType", string(sink.Type)),
54-
zap.String("messageID", m.ID),
55-
zap.String("ingestedReqID", m.IngestedReqID),
56-
)
51+
func (s *Supervisor) startReadyProcessor(ctx context.Context, f *models.Flow, sink *models.Sink, mChan chan *models.Message) {
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
return
56+
case m := <-mChan:
57+
logger := s.logger.With(
58+
zap.String("flowID", f.ID),
59+
zap.String("sinkID", sink.ID),
60+
zap.String("sinkType", string(sink.Type)),
61+
zap.String("messageID", m.ID),
62+
zap.String("ingestedReqID", m.IngestedReqID),
63+
)
5764

58-
logger.Info("processing message", zap.Int("attempt#", len(m.DeliveryAttempts)+1))
65+
logger.Info("processing message", zap.Int("attempt#", len(m.DeliveryAttempts)+1))
5966

60-
processingErr := s.messageProcessor.Process(ctx, sink, m)
61-
if processingErr != nil {
62-
logger.Info("message processing failed")
63-
queuedInfo, err := s.processingResultsSvc.HandleFailed(ctx, sink, m, processingErr)
64-
if err != nil {
65-
return errors.Wrapf(err, "could not handle failed processing")
66-
}
67-
logger.Info("message queued after failure", zap.String("queue", string(queuedInfo.QueueStatus)), zap.Time("nextAttemptAfter", queuedInfo.DeliverAfter))
68-
} else {
69-
logger.Info("message processed ok")
70-
err := s.processingResultsSvc.HandleOK(ctx, m)
71-
if err != nil {
72-
return errors.Wrapf(err, "failed to handle ok processing")
67+
processingErr := s.messageProcessor.Process(ctx, sink, m)
68+
if processingErr != nil {
69+
logger.Info("message processing failed")
70+
queuedInfo, err := s.processingResultsSvc.HandleFailed(ctx, sink, m, processingErr)
71+
if err != nil {
72+
logger.Error("could not handle failed processing", zap.Error(err))
73+
continue
74+
}
75+
logger.Info("message queued after failure", zap.String("queue", string(queuedInfo.QueueStatus)), zap.Time("nextAttemptAfter", queuedInfo.DeliverAfter))
76+
} else {
77+
logger.Info("message processed ok")
78+
err := s.processingResultsSvc.HandleOK(ctx, m)
79+
if err != nil {
80+
logger.Error("failed to handle ok processing", zap.Error(err))
81+
continue
82+
}
83+
logger.Info("message processed ok - finalized")
84+
}
7385
}
7486
}
75-
76-
return nil
7787
}

pkg/supervisor/ready_test.go

+61-16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"testing"
7+
"time"
78

89
"github.com/didil/inhooks/pkg/models"
910
"github.com/didil/inhooks/pkg/testsupport"
@@ -13,13 +14,14 @@ import (
1314
"go.uber.org/zap"
1415
)
1516

16-
func TestSupervisorFetchAndProcess_OK(t *testing.T) {
17+
func TestSupervisorHandleReadyQueue_OK(t *testing.T) {
1718
ctx := context.Background()
1819

1920
appConf, err := testsupport.InitAppConfig(ctx)
2021
assert.NoError(t, err)
2122

2223
appConf.Supervisor.ErrSleepTime = 0
24+
appConf.Supervisor.ReadyQueueConcurrency = 1
2325

2426
ctrl := gomock.NewController(t)
2527
defer ctrl.Finish()
@@ -46,10 +48,6 @@ func TestSupervisorFetchAndProcess_OK(t *testing.T) {
4648
messageProcessor := mocks.NewMockMessageProcessor(ctrl)
4749
processingResultsService := mocks.NewMockProcessingResultsService(ctrl)
4850

49-
messageFetcher.EXPECT().GetMessageForProcessing(ctx, appConf.Supervisor.ReadyWaitTime, flowId1, sinkID1).Return(m, nil)
50-
messageProcessor.EXPECT().Process(ctx, sink1, m).Return(nil)
51-
processingResultsService.EXPECT().HandleOK(ctx, m).Return(nil)
52-
5351
logger, err := zap.NewDevelopment()
5452
assert.NoError(t, err)
5553

@@ -61,17 +59,42 @@ func TestSupervisorFetchAndProcess_OK(t *testing.T) {
6159
WithLogger(logger),
6260
)
6361

64-
err = s.FetchAndProcess(ctx, flow1, sink1)
65-
assert.NoError(t, err)
62+
fetcherCallCount := 0
63+
64+
messageFetcher.EXPECT().
65+
GetMessageForProcessing(gomock.Any(), appConf.Supervisor.ReadyWaitTime, flowId1, sinkID1).AnyTimes().
66+
DoAndReturn(func(ctx context.Context, timeout time.Duration, flowID string, sinkID string) (*models.Message, error) {
67+
fetcherCallCount++
68+
69+
if fetcherCallCount == 1 {
70+
return m, nil
71+
}
72+
73+
return nil, nil
74+
})
75+
76+
messageProcessor.EXPECT().
77+
Process(gomock.Any(), sink1, m).
78+
DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message) error {
79+
return nil
80+
})
81+
82+
processingResultsService.EXPECT().HandleOK(gomock.Any(), m).DoAndReturn(func(ctx context.Context, m *models.Message) error {
83+
s.Shutdown()
84+
return nil
85+
})
86+
87+
s.HandleReadyQueue(flow1, sink1)
6688
}
6789

68-
func TestSupervisorFetchAndProcess_Failed(t *testing.T) {
90+
func TestSupervisorHandleReadyQueue_Failed(t *testing.T) {
6991
ctx := context.Background()
7092

7193
appConf, err := testsupport.InitAppConfig(ctx)
7294
assert.NoError(t, err)
7395

7496
appConf.Supervisor.ErrSleepTime = 0
97+
appConf.Supervisor.ReadyQueueConcurrency = 1
7598

7699
ctrl := gomock.NewController(t)
77100
defer ctrl.Finish()
@@ -94,16 +117,10 @@ func TestSupervisorFetchAndProcess_Failed(t *testing.T) {
94117
ID: mID1,
95118
}
96119

97-
processingErr := fmt.Errorf("processing error")
98-
99120
messageFetcher := mocks.NewMockMessageFetcher(ctrl)
100121
messageProcessor := mocks.NewMockMessageProcessor(ctrl)
101122
processingResultsService := mocks.NewMockProcessingResultsService(ctrl)
102123

103-
messageFetcher.EXPECT().GetMessageForProcessing(ctx, appConf.Supervisor.ReadyWaitTime, flowId1, sinkID1).Return(m, nil)
104-
messageProcessor.EXPECT().Process(ctx, sink1, m).Return(processingErr)
105-
processingResultsService.EXPECT().HandleFailed(ctx, sink1, m, processingErr).Return(&models.QueuedInfo{QueueStatus: models.QueueStatusReady}, nil)
106-
107124
logger, err := zap.NewDevelopment()
108125
assert.NoError(t, err)
109126

@@ -115,6 +132,34 @@ func TestSupervisorFetchAndProcess_Failed(t *testing.T) {
115132
WithLogger(logger),
116133
)
117134

118-
err = s.FetchAndProcess(ctx, flow1, sink1)
119-
assert.NoError(t, err)
135+
processingErr := fmt.Errorf("processing error")
136+
137+
fetcherCallCount := 0
138+
messageFetcher.EXPECT().
139+
GetMessageForProcessing(gomock.Any(), appConf.Supervisor.ReadyWaitTime, flowId1, sinkID1).AnyTimes().
140+
DoAndReturn(func(ctx context.Context, timeout time.Duration, flowID string, sinkID string) (*models.Message, error) {
141+
fetcherCallCount++
142+
143+
if fetcherCallCount == 1 {
144+
return m, nil
145+
}
146+
147+
return nil, nil
148+
})
149+
150+
messageProcessor.EXPECT().
151+
Process(gomock.Any(), sink1, m).
152+
DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message) error {
153+
return processingErr
154+
})
155+
156+
processingResultsService.EXPECT().
157+
HandleFailed(gomock.Any(), sink1, m, processingErr).
158+
DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) {
159+
s.Shutdown()
160+
161+
return &models.QueuedInfo{QueueStatus: models.QueueStatusReady}, nil
162+
})
163+
164+
s.HandleReadyQueue(flow1, sink1)
120165
}

0 commit comments

Comments
 (0)