Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/v7.1.14 #93

Open
wants to merge 85 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
0a5a2c9
Throttle courier queues when the channel has rate limit redis key not…
norkans7 Oct 8, 2021
d63ccf3
refactor external channel handler to use headers config on send
rasoro Oct 28, 2021
2c33fc2
Add support for 'Expired' status in the AT handler
jasonrogena Nov 10, 2021
0ae35b6
Test with Redis 3.2.4
rowanseymour Nov 19, 2021
7d49998
Remove chatbase support
rowanseymour Nov 22, 2021
3368e1a
Merge pull request #388 from nyaruka/remove_chatbase
rowanseymour Nov 23, 2021
382ab18
Merge pull request #387 from jasonrogena/at-expired-status
rowanseymour Nov 23, 2021
9fef1ba
Update CHANGELOG.md for v7.1.0
rowanseymour Nov 23, 2021
71f1763
Pin to go 1.17.2
rowanseymour Nov 23, 2021
7466834
Update CHANGELOG.md for v7.1.1
rowanseymour Nov 23, 2021
a1f5a5d
add support to old way to set authorization token header
rasoro Nov 23, 2021
6123ec0
Merge pull request #386 from Ilhasoft/feature/external-channel-header…
rowanseymour Nov 23, 2021
538fdcf
Add comment
rowanseymour Nov 23, 2021
3447101
Update CHANGELOG.md for v7.1.2
rowanseymour Nov 23, 2021
42f1110
Use response_to_external_id instead of response_to_id
rowanseymour Nov 30, 2021
19771f2
Merge pull request #389 from nyaruka/no_response_to
rowanseymour Dec 1, 2021
b609d29
Update CHANGELOG.md for v7.1.3
rowanseymour Dec 1, 2021
30f9a91
Remove loop detection now that mailroom does this
rowanseymour Dec 1, 2021
bef57dd
Merge pull request #385 from Ilhasoft/feature/viber-config-button-layout
rowanseymour Dec 2, 2021
b6b72d9
Merge pull request #390 from nyaruka/remove_loop_detection
rowanseymour Dec 2, 2021
99c9b6a
Update CHANGELOG.md for v7.1.4
rowanseymour Dec 2, 2021
756b32f
Add Msg.failed_reason
rowanseymour Dec 6, 2021
d137ef3
Merge pull request #391 from nyaruka/failed_reason
rowanseymour Dec 6, 2021
ca40349
Update CHANGELOG.md for v7.1.5
rowanseymour Dec 6, 2021
46317fa
Merge branch 'main' of github.com:nyaruka/courier into fix-WA-rate-limit
norkans7 Dec 8, 2021
9560dc9
Add comment about the 2 seconds pause choice
norkans7 Dec 8, 2021
46475f1
Merge pull request #382 from nyaruka/fix-WA-rate-limit
rowanseymour Dec 8, 2021
cb453c1
Update CHANGELOG.md for v7.1.6
rowanseymour Dec 8, 2021
718aaa1
Update .gitignore to include deploy/
rowanseymour Dec 10, 2021
3233f3d
Add instagram handler
Robi9 Nov 29, 2021
ba97328
refactor instagram.go
Robi9 Nov 29, 2021
91885b2
Refactor instagram handler
Robi9 Nov 30, 2021
86f48fe
Add environment variables to instagram
Robi9 Dec 3, 2021
2cf7699
fix: Metadata search for a new contact
Robi9 Dec 16, 2021
3cfcb5a
add import for instagram handler
matmsa27 Dec 17, 2021
7965bd8
Refactor response field to external ID
Robi9 Dec 17, 2021
79a8110
Merge pull request #87 from Ilhasoft/feat/instagram-channel
Robi9 Dec 17, 2021
d27ee1a
feat: Add Instagram channel support to Facebook handler
Robi9 Dec 23, 2021
b914672
Remove unused instagram handler files
Robi9 Dec 23, 2021
fe834f7
Ignore story mention callback
Robi9 Dec 28, 2021
2055330
Add quick replies for vk
Robi9 Dec 30, 2021
220295b
Add support for new keyboard rows
Robi9 Jan 4, 2022
238050c
Remove unused Instagram-type configuration variables
Robi9 Jan 4, 2022
42951fd
Add story mention skip test coverage
Robi9 Jan 4, 2022
eb0c1fd
Merge pull request #394 from Ilhasoft/feature/quick-replies-vk
rowanseymour Jan 5, 2022
df30df8
Update to gocommon v1.15.1
Robi9 Jan 5, 2022
140b70c
Change of urn in instagram tests
Robi9 Jan 5, 2022
4a7fcc9
Rename validateSignatures parameter to useUUIDRoutes
Robi9 Jan 5, 2022
eb6d0aa
Separate TestDescribe for type IG and FBA
Robi9 Jan 5, 2022
4ade345
Use dbutil package from gocommon
rowanseymour Jan 7, 2022
9ce5e88
Change 'EntryID' to 'entryID'
Robi9 Jan 7, 2022
eef3716
Fix variable names
Robi9 Jan 7, 2022
c97ea0b
Rename variable in test cases
Robi9 Jan 7, 2022
479d376
Merge pull request #395 from nyaruka/dbutil
rowanseymour Jan 7, 2022
3163cff
Update CHANGELOG.md for v7.1.7
rowanseymour Jan 7, 2022
db8727b
Do more error wrapping when creating contacts and URNs
rowanseymour Jan 7, 2022
43b982c
Merge branch 'main' into feature/instagram-channel
Robi9 Jan 7, 2022
56b2beb
Merge pull request #396 from nyaruka/extra_errors
rowanseymour Jan 10, 2022
207d7bf
Update CHANGELOG.md for v7.1.8
rowanseymour Jan 10, 2022
50064d1
Fix bulk status updates
rowanseymour Jan 10, 2022
1b867a6
Merge pull request #398 from nyaruka/status_update_fix
rowanseymour Jan 10, 2022
2fd9ac0
Update CHANGELOG.md for v7.1.9
rowanseymour Jan 10, 2022
63e6361
Update to latest gocommon
rowanseymour Jan 10, 2022
4a8a9e6
Update CHANGELOG.md for v7.1.10
rowanseymour Jan 10, 2022
6f444e7
More bulk sql tweaks
rowanseymour Jan 10, 2022
71d5ddf
Update CHANGELOG.md for v7.1.11
rowanseymour Jan 10, 2022
7e8a198
Update to latest gocommon
rowanseymour Jan 12, 2022
bacb70e
Merge pull request #392 from Ilhasoft/feature/instagram-channel
rowanseymour Jan 13, 2022
e51540a
Merge pull request #399 from nyaruka/latest_gocommon
rowanseymour Jan 13, 2022
966c851
Update CHANGELOG.md for v7.1.12
rowanseymour Jan 13, 2022
93f74a0
Added Session Status and modified test case
alviriseup Jan 18, 2022
0fda71b
Merge pull request #402 from alviriseup/main
rowanseymour Jan 18, 2022
a23b58d
Send db and redis stats to librato in backed heartbeat
rowanseymour Jan 18, 2022
9c8dde5
Merge pull request #404 from nyaruka/db_redis_stats
rowanseymour Jan 18, 2022
fd95b68
Update CHANGELOG.md for v7.1.13
rowanseymour Jan 18, 2022
54ca176
Add support to receive button text from Twilio WhatsApp
norkans7 Jan 19, 2022
e3f9600
Support sending WA quick replies when we have attachments too
norkans7 Jan 19, 2022
8b01318
Allow more active redis connections
rowanseymour Jan 19, 2022
6104fd1
Merge pull request #406 from nyaruka/WA-QR-attachments
rowanseymour Jan 19, 2022
67b27dc
Merge pull request #405 from nyaruka/TWA-buttons-receive
rowanseymour Jan 19, 2022
347d806
Merge pull request #407 from nyaruka/more_redis_conns
rowanseymour Jan 19, 2022
cbfd24d
Update CHANGELOG.md for v7.1.14
rowanseymour Jan 19, 2022
0cccda2
Merge nyaruka 'main' branch in update/v7.1.14
Robi9 Jan 21, 2022
4be6653
Fix payload build
Robi9 Jan 25, 2022
2666dcf
Fix facebookapp handler tests
Robi9 Jan 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
name: CI
on: [push, pull_request]
env:
go-version: '1.17.x'
go-version: '1.17.2' # https://github.com/golang/go/issues/49366
redis-version: '3.2.4'
jobs:
test:
name: Test
Expand All @@ -16,7 +17,7 @@ jobs:
- name: Install Redis
uses: zhulik/[email protected]
with:
redis version: '5'
redis version: ${{ env.redis-version }}

- name: Install PostgreSQL
uses: harmon758/postgresql-action@v1
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.a
*.so
*~
deploy
fabric
fabfile.py
fabfile.pyc
Expand Down Expand Up @@ -34,4 +35,4 @@ _testmain.go
dist/
.envrc
courier
_storage
_storage
68 changes: 68 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,71 @@
v7.1.14
----------
* Allow more active redis connections
* Support sending WA quick replies when we have attachments too
* Add support to receive button text from Twilio WhatsApp

v7.1.13
----------
* Send db and redis stats to librato in backed heartbeat
* Include session_status in FCM payloads

v7.1.12
----------
* Update to latest gocommon
* Add instagram handler

v7.1.11
----------
* More bulk sql tweaks

v7.1.10
----------
* Update to latest gocommon

v7.1.9
----------
* Fix bulk status updates

v7.1.8
----------
* Do more error wrapping when creating contacts and URNs

v7.1.7
----------
* Use dbutil package from gocommon
* Add quick replies for vk

v7.1.6
----------
* Throttle WA queues when we get 429 responses

v7.1.5
----------
* Add Msg.failed_reason and set when msg fails due to reaching error limit

v7.1.4
----------
* Remove loop detection now that mailroom does this
* Smarter organization of quick replies for viber keyboards

v7.1.3
----------
* Use response_to_external_id instead of response_to_id

v7.1.2
----------
* External channel handler should use headers config setting if provided

v7.1.1
----------
* Pin to go 1.17.2

v7.1.0
----------
* Remove chatbase support
* Test with Redis 3.2.4
* Add support for 'Expired' status in the AT handler

v7.0.0
----------
* Tweak README
Expand Down
4 changes: 0 additions & 4 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// IsMsgLoop returns whether the passed in message is part of a message loop, possibly with another bot. Backends should
// implement their own logic to implement this.
IsMsgLoop(ctx context.Context, msg Msg) (bool, error)

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be
// used to determine any sort of deduping of msg sends
Expand Down
143 changes: 57 additions & 86 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/batch"
"github.com/nyaruka/courier/chatbase"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/utils"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/storage"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/librato"
Expand All @@ -33,17 +32,9 @@ const msgQueueName = "msgs"
// the name of our set for tracking sends
const sentSetName = "msgs_sent_%s"

// constants used in org configs for chatbase
const chatbaseAPIKey = "CHATBASE_API_KEY"
const chatbaseVersion = "CHATBASE_VERSION"
const chatbaseMessageType = "agent"

// our timeout for backend operations
const backendTimeout = time.Second * 20

// number of messages for loop detection
const msgLoopThreshold = 20

func init() {
courier.RegisterBackend("rapidpro", newBackend)
}
Expand Down Expand Up @@ -208,63 +199,6 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
return err
}

var luaMsgLoop = redis.NewScript(3, `-- KEYS: [key, contact_id, text]
local key = KEYS[1]
local contact_id = KEYS[2]
local text = KEYS[3]
local count = 1

-- try to look up in window
local record = redis.call("hget", key, contact_id)
if record then
local record_count = tonumber(string.sub(record, 1, 2))
local record_text = string.sub(record, 4, -1)

if record_text == text then
count = math.min(record_count + 1, 99)
else
count = 1
end
end

-- create our new record with our updated count
record = string.format("%02d:%s", count, text)

-- write our new record with updated count
redis.call("hset", key, contact_id, record)

-- sets its expiration
redis.call("expire", key, 300)

return count
`)

// IsMsgLoop checks whether the passed in message is part of a loop
func (b *backend) IsMsgLoop(ctx context.Context, msg courier.Msg) (bool, error) {
m := msg.(*DBMsg)

// things that aren't replies can't be loops, neither do we count retries
if m.ResponseToID_ == courier.NilMsgID || m.ErrorCount_ > 0 {
return false, nil
}

// otherwise run our script to check whether this is a loop in the past 5 minutes
rc := b.redisPool.Get()
defer rc.Close()

keyTime := time.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf(sentSetName, fmt.Sprintf("loop_msgs:%s", keyTime.Format("2006-01-02-15:04")))
count, err := redis.Int(luaMsgLoop.Do(rc, key, m.ContactID_, m.Text_))
if err != nil {
return false, errors.Wrapf(err, "error while checking for msg loop")
}

if count >= msgLoopThreshold {
return true, nil
}
return false, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg, status courier.MsgStatus) {
rc := b.redisPool.Get()
Expand Down Expand Up @@ -292,16 +226,6 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg,
}
}
}

// if this org has chatbase connected, notify chatbase
chatKey, _ := msg.Channel().OrgConfigForKey(chatbaseAPIKey, "").(string)
if chatKey != "" {
chatVersion, _ := msg.Channel().OrgConfigForKey(chatbaseVersion, "").(string)
err := chatbase.SendChatbaseMessage(chatKey, chatVersion, chatbaseMessageType, dbMsg.ContactID_.String(), msg.Channel().Name(), msg.Text(), time.Now().UTC())
if err != nil {
logrus.WithError(err).WithField("chatbase_api_key", chatKey).WithField("chatbase_version", chatVersion).WithField("msg_id", dbMsg.ID().String()).Error("unable to write chatbase message")
}
}
}

// WriteMsg writes the passed in message to our store
Expand Down Expand Up @@ -513,10 +437,39 @@ func (b *backend) Heartbeat() error {
bulkSize += count
}

// log our total
// get our DB and redis stats
dbStats := b.db.Stats()
redisStats := b.redisPool.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - b.dbWaitDuration
dbWaitCountInPeriod := dbStats.WaitCount - b.dbWaitCount
redisWaitDurationInPeriod := redisStats.WaitDuration - b.redisWaitDuration
redisWaitCountInPeriod := redisStats.WaitCount - b.redisWaitCount

b.dbWaitDuration = dbStats.WaitDuration
b.dbWaitCount = dbStats.WaitCount
b.redisWaitDuration = redisStats.WaitDuration
b.redisWaitCount = redisStats.WaitCount

librato.Gauge("courier.db_busy", float64(dbStats.InUse))
librato.Gauge("courier.db_idle", float64(dbStats.Idle))
librato.Gauge("courier.db_wait_ms", float64(dbWaitDurationInPeriod/time.Millisecond))
librato.Gauge("courier.db_wait_count", float64(dbWaitCountInPeriod))
librato.Gauge("courier.redis_wait_ms", float64(redisWaitDurationInPeriod/time.Millisecond))
librato.Gauge("courier.redis_wait_count", float64(redisWaitCountInPeriod))
librato.Gauge("courier.bulk_queue", float64(bulkSize))
librato.Gauge("courier.priority_queue", float64(prioritySize))
logrus.WithField("bulk_queue", bulkSize).WithField("priority_queue", prioritySize).Info("heartbeat queue sizes calculated")

logrus.WithFields(logrus.Fields{
"db_busy": dbStats.InUse,
"db_idle": dbStats.Idle,
"db_wait_time": dbWaitDurationInPeriod,
"db_wait_count": dbWaitCountInPeriod,
"redis_wait_time": dbWaitDurationInPeriod,
"redis_wait_count": dbWaitCountInPeriod,
"priority_size": prioritySize,
"bulk_size": bulkSize,
}).Info("current analytics")

return nil
}
Expand Down Expand Up @@ -639,11 +592,11 @@ func (b *backend) Start() error {
// create our pool
redisPool := &redis.Pool{
Wait: true, // makes callers wait for a connection
MaxActive: 8, // only open this many concurrent connections at once
MaxActive: 36, // only open this many concurrent connections at once
MaxIdle: 4, // only keep up to this many idle
IdleTimeout: 240 * time.Second, // how long to wait before reaping a connection
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", fmt.Sprintf("%s", redisURL.Host))
conn, err := redis.Dial("tcp", redisURL.Host)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -727,7 +680,15 @@ func (b *backend) Start() error {
// create our status committer and start it
b.statusCommitter = batch.NewCommitter("status committer", b.db, bulkUpdateMsgStatusSQL, time.Millisecond*500, b.committerWG,
func(err error, value batch.Value) {
logrus.WithField("comp", "status committer").WithError(err).Error("error writing status")
log := logrus.WithField("comp", "status committer")

if qerr := dbutil.AsQueryError(err); qerr != nil {
query, params := qerr.Query()
log = log.WithFields(logrus.Fields{"sql": query, "sql_params": params})
}

log.WithError(err).Error("error writing status")

err = courier.WriteToSpool(b.config.SpoolDir, "statuses", value)
if err != nil {
logrus.WithField("comp", "status committer").WithError(err).Error("error writing status to spool")
Expand All @@ -738,7 +699,14 @@ func (b *backend) Start() error {
// create our log committer and start it
b.logCommitter = batch.NewCommitter("log committer", b.db, insertLogSQL, time.Millisecond*500, b.committerWG,
func(err error, value batch.Value) {
logrus.WithField("comp", "log committer").WithError(err).Error("error writing channel log")
log := logrus.WithField("comp", "log committer")

if qerr := dbutil.AsQueryError(err); qerr != nil {
query, params := qerr.Query()
log = log.WithFields(logrus.Fields{"sql": query, "sql_params": params})
}

log.WithError(err).Error("error writing channel log")
})
b.logCommitter.Start()

Expand Down Expand Up @@ -813,10 +781,13 @@ type backend struct {
db *sqlx.DB
redisPool *redis.Pool
storage storage.Storage
awsCreds *credentials.Credentials

popScript *redis.Script

stopChan chan bool
waitGroup *sync.WaitGroup

// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments
dbWaitDuration time.Duration
dbWaitCount int64
redisWaitDuration time.Duration
redisWaitCount int64
}
Loading