Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4386f41
LG-8440: Install aws-sdk-ruby; create migration for status check field
NavaTim May 4, 2023
e2d7229
LG-8440: Start writing job to check enrollment status update messages…
NavaTim May 4, 2023
55e8672
LG-8440: Install mail gem; update job to read email sent via SNS/SQS
NavaTim May 5, 2023
4c79a7d
LG-8440: Rename job; improve handling for errors and message deletion
NavaTim May 5, 2023
8b0a44f
LG-8440: Continue refining logic around error logging; break up logic…
NavaTim May 9, 2023
f3da7ef
LG-8440: Split out analytics methods; start writing specs for job
NavaTim May 9, 2023
10ea163
LG-8440: Test and refine error reporting module
NavaTim May 10, 2023
2cec8e4
LG-8440: Continue writing tests; improve docs and error logic
NavaTim May 11, 2023
fb3158e
LG-8440: Continue writing tests
NavaTim May 12, 2023
a40e2db
LG-8440: Write test for enrollment pipeline
NavaTim May 12, 2023
18fc6b1
LG-8440: Fix module imports and default config
NavaTim May 12, 2023
2cd9be2
LG-8440: Consolidate SQS client calls; update tests
NavaTim May 12, 2023
b69a7ba
LG-8440: Write spec for SQS batch processor
NavaTim May 13, 2023
da238a7
LG-8440: Format spec file; add method documentation to process_batch
NavaTim May 15, 2023
3026842
LG-8440: Add test for job; add new config value
NavaTim May 16, 2023
e5ccac3
LG-8440: Add config keys; rename key; lint fixes
NavaTim May 16, 2023
d7a829a
LG-8440: Update analytics params and documentation
NavaTim May 17, 2023
142053a
changelog: Internal, In-Person Proofing, Ingest in-person enrollment …
NavaTim May 17, 2023
19d6c56
LG-8440: Extract batch deletion into separate method
NavaTim May 17, 2023
400d935
LG-8440: Refactor modules to support dependency injection (WIP)
NavaTim May 18, 2023
c22ad65
LG-8440: Move factory methods to job; write tests for factory methods…
NavaTim May 18, 2023
b2e2fd7
LG-8440: Fix date logging and test
NavaTim May 19, 2023
b09dc3e
LG-8440: Increase the AWS HTTP read timeout for SQS client
NavaTim May 19, 2023
d573b9b
LG-8440: Update error reporter to use analytics directly
NavaTim May 19, 2023
853013c
LG-8440: Update to remove analytics factory
NavaTim May 19, 2023
1c7cc98
Update spec/jobs/in_person/enrollments_ready_for_status_check/enrollm…
NavaTim May 24, 2023
f160c10
Update app/jobs/in_person/enrollments_ready_for_status_check/sqs_batc…
NavaTim May 24, 2023
fbe34ee
LG-9905: Use blank? instead of unless for flag check
NavaTim May 25, 2023
5697fcf
LG-9905: Fix conditional and update test to catch issue
NavaTim May 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ gem 'aws-sdk-pinpoint'
gem 'aws-sdk-pinpointsmsvoice'
gem 'aws-sdk-ses', '~> 1.6'
gem 'aws-sdk-sns'
gem 'aws-sdk-sqs'
gem 'barby', '~> 0.6.8'
gem 'base32-crockford'
gem 'bootsnap', '~> 1.0', require: false
Expand All @@ -35,6 +36,7 @@ gem 'jwt'
gem 'lograge', '>= 0.11.2'
gem 'lookbook', '~> 1.5.3', require: false
gem 'lru_redux'
gem 'mail'
gem 'msgpack', '~> 1.6'
gem 'maxminddb'
gem 'multiset'
Expand Down
5 changes: 5 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ GEM
aws-sdk-sns (1.49.0)
aws-sdk-core (~> 3, >= 3.122.0)
aws-sigv4 (~> 1.1)
aws-sdk-sqs (1.53.0)
aws-sdk-core (~> 3, >= 3.165.0)
aws-sigv4 (~> 1.1)
aws-sigv4 (1.5.2)
aws-eventstream (~> 1, >= 1.0.2)
axe-core-api (4.3.2)
Expand Down Expand Up @@ -729,6 +732,7 @@ DEPENDENCIES
aws-sdk-pinpointsmsvoice
aws-sdk-ses (~> 1.6)
aws-sdk-sns
aws-sdk-sqs
axe-core-rspec (~> 4.2)
barby (~> 0.6.8)
base32-crockford
Expand Down Expand Up @@ -769,6 +773,7 @@ DEPENDENCIES
lograge (>= 0.11.2)
lookbook (~> 1.5.3)
lru_redux
mail
maxminddb
msgpack (~> 1.6)
multiset
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
module InPerson::EnrollmentsReadyForStatusCheck
class BatchProcessor
# @param [InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter] error_reporter
# @param [InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper] sqs_batch_wrapper
# @param [InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline] enrollment_pipeline
def initialize(error_reporter:, sqs_batch_wrapper:, enrollment_pipeline:)
@error_reporter = error_reporter
@sqs_batch_wrapper = sqs_batch_wrapper
@enrollment_pipeline = enrollment_pipeline
end

# Process a batch of incoming messages corresponding to in-person
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for including these comments, very useful!

# enrollments that are ready to have their status checked.
#
# Note: Stats are accepted as param to increment and facilitate logging even
# if this method raises an error.
#
# @param [Array<Aws::SQS::Types::Message>] messages In-person enrollment SQS messages
# @param [Hash] analytics_stats Counters for aggregating info about how items were processed
# @option analytics_stats [Integer] :fetched_items Items received from SQS
# @option analytics_stats [Integer] :valid_items Items matching the expected format/data
# @option analytics_stats [Integer] :invalid_items Items not matching the expected format/data
# @option analytics_stats [Integer] :processed_items Items processed without errors
# @option analytics_stats [Integer] :deleted_items Items successfully deleted from queue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these items that encountered errors when we attempted to process them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're deleting items in the following cases:

  • Processing succeeds
  • The item is malformed (e.g. invalid JSON or has wrong fields)
  • The item does not correspond to a pending enrollment record

If we don't delete the last two, then workers will continue attempting (and failing) to process them.

def process_batch(messages, analytics_stats)
analytics_stats[:fetched_items] += messages.size
# Keep messages to delete in an array for a batch call
deletion_batch = []
messages.each do |sqs_message|
if process_message(sqs_message)
analytics_stats[:valid_items] += 1
else
analytics_stats[:invalid_items] += 1
end

# Append messages to batch so we can dequeue any that we've processed.
#
# If we fail to process the message now but could process it later, then
# we should exclude that message from the deletion batch.
deletion_batch.append(sqs_message)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not getting why the append is happening outside of the above if statement? Based on the comment it feels like the sqs_message should only be appended if process_message returns true?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If process_message returns false, then the enrollment cannot be processed (e.g. invalid JSON, wrong fields, etc.). Keeping it in the queue would unnecessarily slow down the ingestion of other email notifications.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to Jack’s question-how does a message meet the condition described in the comment on line 38?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svalexander The message will meet the condition if an uncaught error occurs while processing the message. One example would be if the database becomes unavailable before we can update the enrollment status.

analytics_stats[:processed_items] += 1
end
ensure
# The messages were processed, so remove them from the queue
analytics_stats[:deleted_items] += process_deletions(deletion_batch)
end

private

attr_reader :error_reporter, :sqs_batch_wrapper, :enrollment_pipeline

delegate :report_error, to: :error_reporter
delegate :delete_message_batch, to: :sqs_batch_wrapper
delegate :process_message, to: :enrollment_pipeline

# Delete messages from the queue and report deletion errors
# @param [Array<Aws::SQS::Types::Message>] deletion_batch SQS messages to delete
# @return [Integer] Number of items deleted
def process_deletions(deletion_batch)
return 0 if deletion_batch.empty?

delete_result = delete_message_batch(deletion_batch)
delete_result.failed.each do |error_entry|
report_error(
'Failed to delete item from queue',
sqs_delete_error: error_entry.to_h,
)
end
delete_result.successful.size
rescue StandardError => err
report_error(err)
0
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
module InPerson::EnrollmentsReadyForStatusCheck
class EnrollmentPipeline
# @param [InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter] error_reporter
# @param [Regexp] email_body_pattern Pattern matching the expected email body
# Note: email_body_pattern must include a capture group named "enrollment_code"
def initialize(error_reporter:, email_body_pattern:)
@error_reporter = error_reporter
@email_body_pattern = email_body_pattern
end

# Process a message from USPS indicating that an in-person
# enrollment is ready to have its status checked.
#
# When a message can't be processed, then this function will return
# false to indicate that a problem occurred with the message. Otherwise it
# will return true. If an error occurs that can't be clearly identified
# as a problem with the message, then an error will be raised instead.
#
# When a message is successfully processed, then the corresponding
# InPersonEnrollment record will be marked as ready for a status check.
#
# @param [Aws::SQS::Types::Message] sqs_message
# @return [Boolean] Returns false for messages that can't be processed
# @raise [StandardError] Raised when an unhandled error occurs
def process_message(sqs_message)
error_extra = {
sqs_message_id: sqs_message.message_id,
}

# Unwrap SQS message to get SNS message
begin
sns_message = JSON.parse(sqs_message.body, { symbolize_names: true })
rescue JSON::JSONError => err
report_error(err, **error_extra)
return false
end

unless sns_message.is_a?(Hash) && sns_message.key?(:MessageId) && sns_message.key?(:Message)
report_error('SQS message body is not valid SNS payload', **error_extra)
return false
end

error_extra[:sns_message_id] = sns_message[:MessageId]

# Unwrap SNS message to get SES message
begin
ses_message = JSON.parse(sns_message[:Message], { symbolize_names: true })
rescue JSON::JSONError => err
report_error(err, **error_extra)
return false
end

unless ses_message.is_a?(Hash) && ses_message.key?(:content) && ses_message.key?(:mail)
report_error('SNS "Message" field is not a valid SES payload', **error_extra)
return false
end

# Add information to help with debugging
error_extra[:ses_aws_message_id] = ses_message.dig(:mail, :messageId)
error_extra[:ses_mail_timestamp] = ses_message.dig(:mail, :timestamp)
error_extra[:ses_mail_source] = ses_message.dig(:mail, :source)

# https://datatracker.ietf.org/doc/html/rfc5322#section-3.6.1
error_extra[:ses_rfc_origination_date] = ses_message.
dig(:mail, :commonHeaders, :date)&.then do |date|
DateTime.parse(date).to_s
end
# https://datatracker.ietf.org/doc/html/rfc5322#section-3.6.4
error_extra[:ses_rfc_message_id] = ses_message.dig(:mail, :commonHeaders, :messageId)

# Parse email from content of SES message
begin
mail = Mail.read_from_string(ses_message[:content])
# Depending on how the email is created, we may need to read different
# parts of the message
if mail.multipart?
text_body = mail.text_part&.decoded
else
text_body = mail.body.decoded
end
rescue StandardError
report_error(err, **error_extra)
return false
end

unless text_body.respond_to?(:to_s) && text_body.to_s.present?
report_error('Failure occurred when attempting to get email body', **error_extra)
return false
end

# Extract enrollment code from email body
enrollment_code = email_body_pattern.match(text_body.to_s)&.[](:enrollment_code)

unless enrollment_code.is_a?(String)
report_error(
'Failed to extract enrollment code using regex, check email body format and regex',
**error_extra,
)
return false
end

error_extra[:enrollment_code] = enrollment_code

# Look up existing enrollment
id, user_id, ready_for_status_check = InPersonEnrollment.
where(enrollment_code:, status: :pending).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we filter on status, later we update ready_for_status_check, are they the same thing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they currently track different things. status tracks the overall progress of the enrollment creation, checking, and success/failure. ready_for_status_check only covers whether we received a message saying that the enrollment is ready to have its status checked (which is not tracked by status).

The check with USPS may result in a status update if USPS indicates that the enrollment is not present, has expired, or that the user has visited the post office already.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More context here - we are currently checking enrollments that have a pending status. When this and related stories are complete, we will be able to deprioritize (but still eventually check) enrollments that have ready_for_status_check remain as false.

order(created_at: :desc).
limit(1).
pick(
:id, :user_id, :ready_for_status_check
)

if id.nil?
report_error(
'Received code for enrollment that does not exist in the database',
**error_extra,
)
return false
end

error_extra[:enrollment_id] = id
error_extra[:user_id] = user_id

# SQS can deliver the message multiple times, so it's expected that
# sometimes ready_for_status_check will already be set to true.
unless ready_for_status_check
# Mark enrollment as ready for the USPS status check.
#
# Note: There is still a chance of duplicate updates at this point,
# but the conditional above should catch most cases.
InPersonEnrollment.update(id, ready_for_status_check: true)
end
return true
rescue StandardError => err
# Report and re-throw unhandled errors
report_error(err, **error_extra)
raise err
end
Comment on lines 134 to 138
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe unhandled errors in background jobs already go to NewRelic, do we need to notify directly like this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need additional direct error logging here in order to better capture the context of the error; this is most likely to occur after we have captured the enrollment code. However that part could be isolated to CloudWatch if you think that's particularly important.


private

attr_reader :error_reporter, :email_body_pattern
delegate :report_error, to: :error_reporter
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module InPerson
module EnrollmentsReadyForStatusCheck
class ErrorReporter
# @param [String] class_name Class for which to report errors
# @param [Analytics] analytics
def initialize(class_name, analytics)
@class_name = class_name
@analytics = analytics
end

# Reports an error. A non-StandardError will be converted to
# a RuntimeError before being reported.
# @param [#to_s,StandardError] error
def report_error(error, **extra)
error = RuntimeError.new("#{@class_name}: #{error}") unless error.is_a?(StandardError)
analytics.idv_in_person_proofing_enrollments_ready_for_status_check_job_ingestion_error(
exception_class: error.class,
exception_message: error.message,
**extra,
)
NewRelic::Agent.notice_error(error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My vote is to remove this direct NewRelic notification, we tend to react to errors in NR as if they are unhandled 500s so seeing errors like this that were caught might make something seem more urgent than it is

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of error signals that USPS is sending us garbage data or that AWS has become severely misconfigured. At the current scale it may not be consequential, but it could be extremely consequential at the scales we were preparing for earlier this year.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOW - if we are getting this at all, then it is very likely to be a problem that needs addressed soon. It doesn't mean we have to stop processing the other queue items, though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine keeping the NR notification, but we should reconsider if it turns out not to be useful

end

private

attr_reader :analytics
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module InPerson::EnrollmentsReadyForStatusCheck
class SqsBatchWrapper
# @param [Aws::SQS::Client] sqs_client AWS SQS Client
# @param [String] queue_url The URL identifying the SQS queue
# @param [Hash] receive_params Parameters passed to #receive_message
def initialize(sqs_client:, queue_url:, receive_params:)
@sqs_client = sqs_client
@queue_url = queue_url
@receive_params = receive_params
end

# Fetch a batch of messages from the SQS queue
# @return [Array<Aws::SQS::Types::Message>]
def poll
sqs_client.receive_message(receive_params).messages
end

# Delete the provided messages from the SQS queue
# @param [Array<Aws::SQS::Types::Message>] batch
# @return [Aws::SQS::Types::DeleteMessageBatchResult]
def delete_message_batch(batch)
sqs_client.delete_message_batch(
queue_url:,
entries: batch.map do |message|
{
id: message.message_id,
receipt_handle: message.receipt_handle,
}
end,
)
end

private

attr_reader :sqs_client, :queue_url, :receive_params
end
end
Loading