Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…date/v7.1.4
  • Loading branch information
Robi9 committed Jan 26, 2022
2 parents ca5afce + 4ce0680 commit cbfe998
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 174 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ dist/

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
fabric
deploy
fabfile.py
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
v7.1.4
----------
* Record flow on msgs

v7.1.3
----------
* Remove deletion of recent runs as these are no longer created

v7.1.2
----------
* Use run status instead of is_active and exit_type
* No longer include events in run archives

v7.1.1
----------
* Remove references to flowrun.parent_id which is no longer set by mailroom

v7.1.0
----------
* Remove msgs_msg.response_to_id

v7.0.0
----------
* Test on PG12 and 13
Expand Down
36 changes: 18 additions & 18 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func TestCreateMsgArchive(t *testing.T) {

// should have two records, second will have attachments
assert.Equal(t, 3, task.RecordCount)
assert.Equal(t, int64(483), task.Size)
assert.Equal(t, int64(528), task.Size)
assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), task.StartDate)
assert.Equal(t, "6fe9265860425cf1f9757ba3d91b1a05", task.Hash)
assert.Equal(t, "b3bf00bf1234ea47f14ffd0171a8ead0", task.Hash)
assertArchiveFile(t, task, "messages1.jsonl")

DeleteArchiveFile(task)
Expand All @@ -163,8 +163,8 @@ func TestCreateMsgArchive(t *testing.T) {

// should have one record
assert.Equal(t, 1, task.RecordCount)
assert.Equal(t, int64(290), task.Size)
assert.Equal(t, "a719c7ec64c516a6e159d26a70cb4225", task.Hash)
assert.Equal(t, int64(294), task.Size)
assert.Equal(t, "bd163ead077774425aa559e30d48ca87", task.Hash)
assertArchiveFile(t, task, "messages2.jsonl")

DeleteArchiveFile(task)
Expand Down Expand Up @@ -218,8 +218,8 @@ func TestCreateRunArchive(t *testing.T) {

// should have two record
assert.Equal(t, 2, task.RecordCount)
assert.Equal(t, int64(642), task.Size)
assert.Equal(t, "f793f863f5e060b9d67c5688a555da6a", task.Hash)
assert.Equal(t, int64(472), task.Size)
assert.Equal(t, "734d437e1c66d09e033d698c732178f8", task.Hash)
assertArchiveFile(t, task, "runs1.jsonl")

DeleteArchiveFile(task)
Expand All @@ -238,8 +238,8 @@ func TestCreateRunArchive(t *testing.T) {

// should have one record
assert.Equal(t, 1, task.RecordCount)
assert.Equal(t, int64(497), task.Size)
assert.Equal(t, "074de71dfb619c78dbac5b6709dd66c2", task.Hash)
assert.Equal(t, int64(490), task.Size)
assert.Equal(t, "c2138e3c3009a9c09fc55482903d93e4", task.Hash)
assertArchiveFile(t, task, "runs2.jsonl")

DeleteArchiveFile(task)
Expand Down Expand Up @@ -341,14 +341,14 @@ func TestArchiveOrgMessages(t *testing.T) {
assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), created[2].StartDate)
assert.Equal(t, DayPeriod, created[2].Period)
assert.Equal(t, 3, created[2].RecordCount)
assert.Equal(t, int64(483), created[2].Size)
assert.Equal(t, "6fe9265860425cf1f9757ba3d91b1a05", created[2].Hash)
assert.Equal(t, int64(528), created[2].Size)
assert.Equal(t, "b3bf00bf1234ea47f14ffd0171a8ead0", created[2].Hash)

assert.Equal(t, time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), created[3].StartDate)
assert.Equal(t, DayPeriod, created[3].Period)
assert.Equal(t, 1, created[3].RecordCount)
assert.Equal(t, int64(306), created[3].Size)
assert.Equal(t, "7ece4401d3afac9c08a913398f213ffa", created[3].Hash)
assert.Equal(t, int64(312), created[3].Size)
assert.Equal(t, "32e61b1431217b59fca0170f637d78a3", created[3].Hash)

assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[60].StartDate)
assert.Equal(t, DayPeriod, created[60].Period)
Expand All @@ -359,8 +359,8 @@ func TestArchiveOrgMessages(t *testing.T) {
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[61].StartDate)
assert.Equal(t, MonthPeriod, created[61].Period)
assert.Equal(t, 4, created[61].RecordCount)
assert.Equal(t, int64(509), created[61].Size)
assert.Equal(t, "9e40be76913bf58655b70ee96dcac25d", created[61].Hash)
assert.Equal(t, int64(553), created[61].Size)
assert.Equal(t, "156e45e29b6587cb85ccf75e03800b00", created[61].Hash)

assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[62].StartDate)
assert.Equal(t, MonthPeriod, created[62].Period)
Expand Down Expand Up @@ -469,8 +469,8 @@ func TestArchiveOrgRuns(t *testing.T) {
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[0].StartDate)
assert.Equal(t, MonthPeriod, created[0].Period)
assert.Equal(t, 1, created[0].RecordCount)
assert.Equal(t, int64(497), created[0].Size)
assert.Equal(t, "074de71dfb619c78dbac5b6709dd66c2", created[0].Hash)
assert.Equal(t, int64(490), created[0].Size)
assert.Equal(t, "c2138e3c3009a9c09fc55482903d93e4", created[0].Hash)

assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[1].StartDate)
assert.Equal(t, MonthPeriod, created[1].Period)
Expand All @@ -487,8 +487,8 @@ func TestArchiveOrgRuns(t *testing.T) {
assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[11].StartDate)
assert.Equal(t, DayPeriod, created[11].Period)
assert.Equal(t, 2, created[11].RecordCount)
assert.Equal(t, int64(2002), created[11].Size)
assert.Equal(t, "b75d6ee33ce26b786f1b341e875ecd62", created[11].Hash)
assert.Equal(t, int64(1984), created[11].Size)
assert.Equal(t, "869cc00ad4cca0371d07c88d8cf2bf26", created[11].Hash)

assert.Equal(t, 12, len(deleted))

Expand Down
55 changes: 16 additions & 39 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ SELECT rec.visibility, row_to_json(rec) FROM (
row_to_json(contact) as contact,
CASE WHEN oo.is_anon = False THEN ccu.identity ELSE null END as urn,
row_to_json(channel) as channel,
row_to_json(flow) as flow,
CASE WHEN direction = 'I' THEN 'in'
WHEN direction = 'O' THEN 'out'
ELSE NULL
Expand Down Expand Up @@ -61,6 +62,7 @@ SELECT rec.visibility, row_to_json(rec) FROM (
JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True
LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id
LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True
LEFT JOIN LATERAL (select uuid, name from flows_flow f where f.id = mm.flow_id) as flow ON True
LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True
WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3
Expand Down Expand Up @@ -123,12 +125,6 @@ DELETE FROM msgs_msg_labels
WHERE msg_id IN(?)
`

const unlinkResponses = `
UPDATE msgs_msg
SET response_to_id = NULL
WHERE response_to_id IN(?)
`

const deleteMessages = `
DELETE FROM msgs_msg
WHERE id IN(?)
Expand Down Expand Up @@ -189,75 +185,58 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}
rows.Close()

log.WithFields(logrus.Fields{
"msg_count": len(msgIDs),
}).Debug("found messages")
log.WithField("msg_count", len(msgIDs)).Debug("found messages")

// verify we don't see more messages than there are in our archive (fewer is ok)
if visibleCount > archive.RecordCount {
return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount)
}

// ok, delete our messages in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(msgIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(msgIDs) {
endIdx = len(msgIDs)
}
batchIDs := msgIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// now delete any channel logs
err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch)
if err != nil {
return fmt.Errorf("error removing channel logs: %s", err.Error())
return errors.Wrap(err, "error removing channel logs")
}

// then any labels
err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch)
if err != nil {
return fmt.Errorf("error removing message labels: %s", err.Error())
}

// unlink any responses
err = executeInQuery(ctx, tx, unlinkResponses, batchIDs)
if err != nil {
return fmt.Errorf("error unlinking responses: %s", err.Error())
return errors.Wrap(err, "error removing message labels")
}

// finally, delete our messages
err = executeInQuery(ctx, tx, deleteMessages, batchIDs)
err = executeInQuery(ctx, tx, deleteMessages, idBatch)
if err != nil {
return fmt.Errorf("error deleting messages: %s", err.Error())
return errors.Wrap(err, "error deleting messages")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing message delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing message delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of messages")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages")

cancel()
}
Expand All @@ -270,14 +249,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting messages")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages")

return nil
}
Expand Down
Loading

0 comments on commit cbfe998

Please sign in to comment.