From 0a5a2c933debb3e4207d24fd7ed35f485d9cbbda Mon Sep 17 00:00:00 2001 From: Norbert Kwizera Date: Fri, 8 Oct 2021 18:10:52 +0200 Subject: [PATCH 1/2] Throttle courier queues when the channel has rate limit redis key not expired --- handlers/whatsapp/whatsapp.go | 18 ++++++++++-- handlers/whatsapp/whatsapp_test.go | 6 ++++ queue/queue.go | 14 +++++++++ queue/queue_test.go | 46 ++++++++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 2 deletions(-) diff --git a/handlers/whatsapp/whatsapp.go b/handlers/whatsapp/whatsapp.go index e1fd4116d..651b960cd 100644 --- a/handlers/whatsapp/whatsapp.go +++ b/handlers/whatsapp/whatsapp.go @@ -12,6 +12,7 @@ import ( "time" "github.com/buger/jsonparser" + "github.com/gomodule/redigo/redis" "github.com/nyaruka/courier" "github.com/nyaruka/courier/handlers" "github.com/nyaruka/courier/utils" @@ -495,6 +496,9 @@ const maxMsgLength = 4096 // SendMsg sends the passed in message, returning any error func (h *handler) SendMsg(ctx context.Context, msg courier.Msg) (courier.MsgStatus, error) { start := time.Now() + conn := h.Backend().RedisPool().Get() + defer conn.Close() + // get our token token := msg.Channel().StringConfigForKey(courier.ConfigAuthToken, "") if token == "" { @@ -525,7 +529,7 @@ func (h *handler) SendMsg(ctx context.Context, msg courier.Msg) (courier.MsgStat for i, payload := range payloads { externalID := "" - wppID, externalID, logs, err = sendWhatsAppMsg(msg, sendPath, payload) + wppID, externalID, logs, err = sendWhatsAppMsg(conn, msg, sendPath, payload) // add logs to our status for _, log := range logs { status.AddLog(log) @@ -827,7 +831,7 @@ func (h *handler) fetchMediaID(msg courier.Msg, mimeType, mediaURL string) (stri return mediaID, logs, nil } -func sendWhatsAppMsg(msg courier.Msg, sendPath *url.URL, payload interface{}) (string, string, []*courier.ChannelLog, error) { +func sendWhatsAppMsg(rc redis.Conn, msg courier.Msg, sendPath *url.URL, payload interface{}) (string, string, []*courier.ChannelLog, error) { start := time.Now() jsonBody, err := json.Marshal(payload) @@ -839,6 +843,16 @@ func sendWhatsAppMsg(msg courier.Msg, sendPath *url.URL, payload interface{}) (s req, _ := http.NewRequest(http.MethodPost, sendPath.String(), bytes.NewReader(jsonBody)) req.Header = buildWhatsAppHeaders(msg.Channel()) rr, err := utils.MakeHTTPRequest(req) + + if rr.StatusCode == 429 || rr.StatusCode == 503 { + rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID().String()) + rc.Do("set", rateLimitKey, "engaged") + rc.Do("expire", rateLimitKey, 2) + + log := courier.NewChannelLogFromRR("rate limit engaged", msg.Channel(), msg.ID(), rr).WithError("Message Send Error", err) + return "", "", []*courier.ChannelLog{log}, err + } + log := courier.NewChannelLogFromRR("Message Sent", msg.Channel(), msg.ID(), rr).WithError("Message Send Error", err) errPayload := &mtErrorPayload{} err = json.Unmarshal(rr.Body, errPayload) diff --git a/handlers/whatsapp/whatsapp_test.go b/handlers/whatsapp/whatsapp_test.go index 40d940445..653a62051 100644 --- a/handlers/whatsapp/whatsapp_test.go +++ b/handlers/whatsapp/whatsapp_test.go @@ -388,6 +388,12 @@ var defaultSendTestCases = []ChannelSendTestCase{ ResponseBody: `{ "errors": [{ "title": "Error Sending" }] }`, ResponseStatus: 403, RequestBody: `{"to":"250788123123","type":"text","text":{"body":"Error"}}`, SendPrep: setSendURL}, + {Label: "Rate Limit Engaged", + Text: "Error", URN: "whatsapp:250788123123", + Status: "E", + ResponseBody: `{ "errors": [{ "title": "Too many requests" }] }`, ResponseStatus: 429, + RequestBody: `{"to":"250788123123","type":"text","text":{"body":"Error"}}`, + SendPrep: setSendURL}, {Label: "No Message ID", Text: "Error", URN: "whatsapp:250788123123", Status: "E", diff --git a/queue/queue.go b/queue/queue.go index 7aa4bebaa..da6b96e83 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -83,10 +83,24 @@ var luaPop = redis.NewScript(2, `-- KEYS: [EpochMS QueueType] local delim = string.find(queue, "|") local tps = 0 local tpsKey = "" + + local queueName = "" + if delim then + queueName = string.sub(queue, string.len(KEYS[2])+2, delim-1) tps = tonumber(string.sub(queue, delim+1)) end + if queueName then + local rateLimitKey = "rate_limit:" .. queueName + local rateLimitEngaged = redis.call("get", rateLimitKey) + if rateLimitEngaged then + redis.call("zincrby", KEYS[2] .. ":throttled", workers, queue) + redis.call("zrem", KEYS[2] .. ":active", queue) + return {"retry", ""} + end + end + -- if we have a tps, then check whether we exceed it if tps > 0 then tpsKey = queue .. ":tps:" .. math.floor(KEYS[1]) diff --git a/queue/queue_test.go b/queue/queue_test.go index 157793a0e..81cd415a3 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -50,6 +50,7 @@ func TestLua(t *testing.T) { defer close(quitter) rate := 10 + for i := 0; i < 20; i++ { err := PushOntoQueue(conn, "msgs", "chan1", rate, fmt.Sprintf(`[{"id":%d}]`, i), LowPriority) assert.NoError(err) @@ -166,6 +167,51 @@ func TestLua(t *testing.T) { assert.NoError(err) assert.Equal(EmptyQueue, queue) assert.Empty(value) + + err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":34}]`, HighPriority) + assert.NoError(err) + + conn.Do("set", "rate_limit:chan1", "engaged") + conn.Do("EXPIRE", "rate_limit:chan1", 5) + + // we have the rate limit set, + queue, value, err = PopFromQueue(conn, "msgs") + if value != "" && queue != EmptyQueue { + t.Fatal("Should be throttled") + } + + time.Sleep(2 * time.Second) + queue, value, err = PopFromQueue(conn, "msgs") + if value != "" && queue != EmptyQueue { + t.Fatal("Should be throttled") + } + + count, err = redis.Int(conn.Do("zcard", "msgs:throttled")) + assert.NoError(err) + assert.Equal(1, count, "Expected chan1 to be throttled") + + count, err = redis.Int(conn.Do("zcard", "msgs:active")) + assert.NoError(err) + assert.Equal(0, count, "Expected chan1 to not be active") + + // but if we wait for the rate limit to expire + time.Sleep(3 * time.Second) + + // next should be 34 + queue, value, err = PopFromQueue(conn, "msgs") + assert.NotEqual(queue, EmptyQueue) + assert.Equal(`{"id":34}`, value) + assert.NoError(err) + + // nothing should be left + queue = Retry + for queue == Retry { + queue, value, err = PopFromQueue(conn, "msgs") + } + assert.NoError(err) + assert.Equal(EmptyQueue, queue) + assert.Empty(value) + } func nTestThrottle(t *testing.T) { From 9560dc96c3d6feb0c89ebe2d87101cb728c6bc7f Mon Sep 17 00:00:00 2001 From: Norbert Kwizera Date: Wed, 8 Dec 2021 16:11:43 +0200 Subject: [PATCH 2/2] Add comment about the 2 seconds pause choice --- handlers/whatsapp/whatsapp.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/handlers/whatsapp/whatsapp.go b/handlers/whatsapp/whatsapp.go index 651b960cd..ee9f30022 100644 --- a/handlers/whatsapp/whatsapp.go +++ b/handlers/whatsapp/whatsapp.go @@ -847,6 +847,10 @@ func sendWhatsAppMsg(rc redis.Conn, msg courier.Msg, sendPath *url.URL, payload if rr.StatusCode == 429 || rr.StatusCode == 503 { rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID().String()) rc.Do("set", rateLimitKey, "engaged") + + // The rate limit is 50 requests per second + // We pause sending 2 seconds so the limit count is reset + // TODO: In the future we should the header value when available rc.Do("expire", rateLimitKey, 2) log := courier.NewChannelLogFromRR("rate limit engaged", msg.Channel(), msg.ID(), rr).WithError("Message Send Error", err)