diff --git a/Gemfile b/Gemfile index 9cbb44f0a69..7b13bdf2869 100644 --- a/Gemfile +++ b/Gemfile @@ -11,6 +11,7 @@ gem 'aws-sdk-pinpoint' gem 'aws-sdk-pinpointsmsvoice' gem 'aws-sdk-ses', '~> 1.6' gem 'aws-sdk-sns' +gem 'aws-sdk-sqs' gem 'barby', '~> 0.6.8' gem 'base32-crockford' gem 'bootsnap', '~> 1.0', require: false @@ -35,6 +36,7 @@ gem 'jwt' gem 'lograge', '>= 0.11.2' gem 'lookbook', '~> 1.5.3', require: false gem 'lru_redux' +gem 'mail' gem 'msgpack', '~> 1.6' gem 'maxminddb' gem 'multiset' diff --git a/Gemfile.lock b/Gemfile.lock index ca2dafb71fd..f4757ec960f 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -164,6 +164,9 @@ GEM aws-sdk-sns (1.49.0) aws-sdk-core (~> 3, >= 3.122.0) aws-sigv4 (~> 1.1) + aws-sdk-sqs (1.53.0) + aws-sdk-core (~> 3, >= 3.165.0) + aws-sigv4 (~> 1.1) aws-sigv4 (1.5.2) aws-eventstream (~> 1, >= 1.0.2) axe-core-api (4.3.2) @@ -729,6 +732,7 @@ DEPENDENCIES aws-sdk-pinpointsmsvoice aws-sdk-ses (~> 1.6) aws-sdk-sns + aws-sdk-sqs axe-core-rspec (~> 4.2) barby (~> 0.6.8) base32-crockford @@ -769,6 +773,7 @@ DEPENDENCIES lograge (>= 0.11.2) lookbook (~> 1.5.3) lru_redux + mail maxminddb msgpack (~> 1.6) multiset diff --git a/app/jobs/in_person/enrollments_ready_for_status_check/batch_processor.rb b/app/jobs/in_person/enrollments_ready_for_status_check/batch_processor.rb new file mode 100644 index 00000000000..2098bda3cae --- /dev/null +++ b/app/jobs/in_person/enrollments_ready_for_status_check/batch_processor.rb @@ -0,0 +1,75 @@ +module InPerson::EnrollmentsReadyForStatusCheck + class BatchProcessor + # @param [InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter] error_reporter + # @param [InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper] sqs_batch_wrapper + # @param [InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline] enrollment_pipeline + def initialize(error_reporter:, sqs_batch_wrapper:, enrollment_pipeline:) + @error_reporter = error_reporter + @sqs_batch_wrapper = sqs_batch_wrapper + @enrollment_pipeline = enrollment_pipeline + end + + # Process a batch of incoming messages corresponding to in-person + # enrollments that are ready to have their status checked. + # + # Note: Stats are accepted as param to increment and facilitate logging even + # if this method raises an error. + # + # @param [Array] messages In-person enrollment SQS messages + # @param [Hash] analytics_stats Counters for aggregating info about how items were processed + # @option analytics_stats [Integer] :fetched_items Items received from SQS + # @option analytics_stats [Integer] :valid_items Items matching the expected format/data + # @option analytics_stats [Integer] :invalid_items Items not matching the expected format/data + # @option analytics_stats [Integer] :processed_items Items processed without errors + # @option analytics_stats [Integer] :deleted_items Items successfully deleted from queue + def process_batch(messages, analytics_stats) + analytics_stats[:fetched_items] += messages.size + # Keep messages to delete in an array for a batch call + deletion_batch = [] + messages.each do |sqs_message| + if process_message(sqs_message) + analytics_stats[:valid_items] += 1 + else + analytics_stats[:invalid_items] += 1 + end + + # Append messages to batch so we can dequeue any that we've processed. + # + # If we fail to process the message now but could process it later, then + # we should exclude that message from the deletion batch. + deletion_batch.append(sqs_message) + analytics_stats[:processed_items] += 1 + end + ensure + # The messages were processed, so remove them from the queue + analytics_stats[:deleted_items] += process_deletions(deletion_batch) + end + + private + + attr_reader :error_reporter, :sqs_batch_wrapper, :enrollment_pipeline + + delegate :report_error, to: :error_reporter + delegate :delete_message_batch, to: :sqs_batch_wrapper + delegate :process_message, to: :enrollment_pipeline + + # Delete messages from the queue and report deletion errors + # @param [Array] deletion_batch SQS messages to delete + # @return [Integer] Number of items deleted + def process_deletions(deletion_batch) + return 0 if deletion_batch.empty? + + delete_result = delete_message_batch(deletion_batch) + delete_result.failed.each do |error_entry| + report_error( + 'Failed to delete item from queue', + sqs_delete_error: error_entry.to_h, + ) + end + delete_result.successful.size + rescue StandardError => err + report_error(err) + 0 + end + end +end diff --git a/app/jobs/in_person/enrollments_ready_for_status_check/enrollment_pipeline.rb b/app/jobs/in_person/enrollments_ready_for_status_check/enrollment_pipeline.rb new file mode 100644 index 00000000000..a03d5b56e13 --- /dev/null +++ b/app/jobs/in_person/enrollments_ready_for_status_check/enrollment_pipeline.rb @@ -0,0 +1,145 @@ +module InPerson::EnrollmentsReadyForStatusCheck + class EnrollmentPipeline + # @param [InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter] error_reporter + # @param [Regexp] email_body_pattern Pattern matching the expected email body + # Note: email_body_pattern must include a capture group named "enrollment_code" + def initialize(error_reporter:, email_body_pattern:) + @error_reporter = error_reporter + @email_body_pattern = email_body_pattern + end + + # Process a message from USPS indicating that an in-person + # enrollment is ready to have its status checked. + # + # When a message can't be processed, then this function will return + # false to indicate that a problem occurred with the message. Otherwise it + # will return true. If an error occurs that can't be clearly identified + # as a problem with the message, then an error will be raised instead. + # + # When a message is successfully processed, then the corresponding + # InPersonEnrollment record will be marked as ready for a status check. + # + # @param [Aws::SQS::Types::Message] sqs_message + # @return [Boolean] Returns false for messages that can't be processed + # @raise [StandardError] Raised when an unhandled error occurs + def process_message(sqs_message) + error_extra = { + sqs_message_id: sqs_message.message_id, + } + + # Unwrap SQS message to get SNS message + begin + sns_message = JSON.parse(sqs_message.body, { symbolize_names: true }) + rescue JSON::JSONError => err + report_error(err, **error_extra) + return false + end + + unless sns_message.is_a?(Hash) && sns_message.key?(:MessageId) && sns_message.key?(:Message) + report_error('SQS message body is not valid SNS payload', **error_extra) + return false + end + + error_extra[:sns_message_id] = sns_message[:MessageId] + + # Unwrap SNS message to get SES message + begin + ses_message = JSON.parse(sns_message[:Message], { symbolize_names: true }) + rescue JSON::JSONError => err + report_error(err, **error_extra) + return false + end + + unless ses_message.is_a?(Hash) && ses_message.key?(:content) && ses_message.key?(:mail) + report_error('SNS "Message" field is not a valid SES payload', **error_extra) + return false + end + + # Add information to help with debugging + error_extra[:ses_aws_message_id] = ses_message.dig(:mail, :messageId) + error_extra[:ses_mail_timestamp] = ses_message.dig(:mail, :timestamp) + error_extra[:ses_mail_source] = ses_message.dig(:mail, :source) + + # https://datatracker.ietf.org/doc/html/rfc5322#section-3.6.1 + error_extra[:ses_rfc_origination_date] = ses_message. + dig(:mail, :commonHeaders, :date)&.then do |date| + DateTime.parse(date).to_s + end + # https://datatracker.ietf.org/doc/html/rfc5322#section-3.6.4 + error_extra[:ses_rfc_message_id] = ses_message.dig(:mail, :commonHeaders, :messageId) + + # Parse email from content of SES message + begin + mail = Mail.read_from_string(ses_message[:content]) + # Depending on how the email is created, we may need to read different + # parts of the message + if mail.multipart? + text_body = mail.text_part&.decoded + else + text_body = mail.body.decoded + end + rescue StandardError + report_error(err, **error_extra) + return false + end + + unless text_body.respond_to?(:to_s) && text_body.to_s.present? + report_error('Failure occurred when attempting to get email body', **error_extra) + return false + end + + # Extract enrollment code from email body + enrollment_code = email_body_pattern.match(text_body.to_s)&.[](:enrollment_code) + + unless enrollment_code.is_a?(String) + report_error( + 'Failed to extract enrollment code using regex, check email body format and regex', + **error_extra, + ) + return false + end + + error_extra[:enrollment_code] = enrollment_code + + # Look up existing enrollment + id, user_id, ready_for_status_check = InPersonEnrollment. + where(enrollment_code:, status: :pending). + order(created_at: :desc). + limit(1). + pick( + :id, :user_id, :ready_for_status_check + ) + + if id.nil? + report_error( + 'Received code for enrollment that does not exist in the database', + **error_extra, + ) + return false + end + + error_extra[:enrollment_id] = id + error_extra[:user_id] = user_id + + # SQS can deliver the message multiple times, so it's expected that + # sometimes ready_for_status_check will already be set to true. + unless ready_for_status_check + # Mark enrollment as ready for the USPS status check. + # + # Note: There is still a chance of duplicate updates at this point, + # but the conditional above should catch most cases. + InPersonEnrollment.update(id, ready_for_status_check: true) + end + return true + rescue StandardError => err + # Report and re-throw unhandled errors + report_error(err, **error_extra) + raise err + end + + private + + attr_reader :error_reporter, :email_body_pattern + delegate :report_error, to: :error_reporter + end +end diff --git a/app/jobs/in_person/enrollments_ready_for_status_check/error_reporter.rb b/app/jobs/in_person/enrollments_ready_for_status_check/error_reporter.rb new file mode 100644 index 00000000000..31c2f2d2f1a --- /dev/null +++ b/app/jobs/in_person/enrollments_ready_for_status_check/error_reporter.rb @@ -0,0 +1,29 @@ +module InPerson + module EnrollmentsReadyForStatusCheck + class ErrorReporter + # @param [String] class_name Class for which to report errors + # @param [Analytics] analytics + def initialize(class_name, analytics) + @class_name = class_name + @analytics = analytics + end + + # Reports an error. A non-StandardError will be converted to + # a RuntimeError before being reported. + # @param [#to_s,StandardError] error + def report_error(error, **extra) + error = RuntimeError.new("#{@class_name}: #{error}") unless error.is_a?(StandardError) + analytics.idv_in_person_proofing_enrollments_ready_for_status_check_job_ingestion_error( + exception_class: error.class, + exception_message: error.message, + **extra, + ) + NewRelic::Agent.notice_error(error) + end + + private + + attr_reader :analytics + end + end +end diff --git a/app/jobs/in_person/enrollments_ready_for_status_check/sqs_batch_wrapper.rb b/app/jobs/in_person/enrollments_ready_for_status_check/sqs_batch_wrapper.rb new file mode 100644 index 00000000000..6c834b08b2f --- /dev/null +++ b/app/jobs/in_person/enrollments_ready_for_status_check/sqs_batch_wrapper.rb @@ -0,0 +1,37 @@ +module InPerson::EnrollmentsReadyForStatusCheck + class SqsBatchWrapper + # @param [Aws::SQS::Client] sqs_client AWS SQS Client + # @param [String] queue_url The URL identifying the SQS queue + # @param [Hash] receive_params Parameters passed to #receive_message + def initialize(sqs_client:, queue_url:, receive_params:) + @sqs_client = sqs_client + @queue_url = queue_url + @receive_params = receive_params + end + + # Fetch a batch of messages from the SQS queue + # @return [Array] + def poll + sqs_client.receive_message(receive_params).messages + end + + # Delete the provided messages from the SQS queue + # @param [Array] batch + # @return [Aws::SQS::Types::DeleteMessageBatchResult] + def delete_message_batch(batch) + sqs_client.delete_message_batch( + queue_url:, + entries: batch.map do |message| + { + id: message.message_id, + receipt_handle: message.receipt_handle, + } + end, + ) + end + + private + + attr_reader :sqs_client, :queue_url, :receive_params + end +end diff --git a/app/jobs/in_person/enrollments_ready_for_status_check_job.rb b/app/jobs/in_person/enrollments_ready_for_status_check_job.rb new file mode 100644 index 00000000000..3c7c48d1988 --- /dev/null +++ b/app/jobs/in_person/enrollments_ready_for_status_check_job.rb @@ -0,0 +1,107 @@ +module InPerson + # This job checks a queue regularly to determine whether USPS has notitied us + # about whether an in-person enrollment is ready to have its status checked. If + # the enrollment is ready, then this job updates a flag on the enrollment so that it + # will be checked earlier than other enrollments. + class EnrollmentsReadyForStatusCheckJob < ApplicationJob + queue_as :low + + def perform(_now) + return true if IdentityConfig.store.in_person_proofing_enabled.blank? || + IdentityConfig.store.in_person_enrollments_ready_job_enabled.blank? + + begin + analytics.idv_in_person_proofing_enrollments_ready_for_status_check_job_started + + analytics_stats = { + fetched_items: 0, + processed_items: 0, + deleted_items: 0, + valid_items: 0, + invalid_items: 0, + } + + # Continually request messages until no messages are received + while (messages = poll).any? + process_batch(messages, analytics_stats) + end + return true + ensure + analytics.idv_in_person_proofing_enrollments_ready_for_status_check_job_completed( + **analytics_stats, + incomplete_items: + analytics_stats[:fetched_items] - analytics_stats[:processed_items], + deletion_failed_items: + analytics_stats[:processed_items] - analytics_stats[:deleted_items], + ) + end + end + + private + + delegate :poll, to: :sqs_batch_wrapper + delegate :process_batch, to: :batch_processor + delegate :analytics, to: :analytics_factory + + def batch_processor + @batch_processor ||= begin + InPerson::EnrollmentsReadyForStatusCheck::BatchProcessor.new( + error_reporter: InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter.new( + InPerson::EnrollmentsReadyForStatusCheck::BatchProcessor.name, + analytics, + ), + sqs_batch_wrapper: sqs_batch_wrapper, + enrollment_pipeline: InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline.new( + error_reporter: InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter.new( + InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline.name, + analytics, + ), + email_body_pattern: Regexp.new( + # Regex pattern describing the expected email format. + # This should include an "enrollment_code" capture group. + IdentityConfig.store.in_person_enrollments_ready_job_email_body_pattern, + ), + ), + ) + end + end + + def sqs_batch_wrapper + @sqs_batch_wrapper ||= begin + config = IdentityConfig.store + queue_url = config.in_person_enrollments_ready_job_queue_url + max_number_of_messages = config.in_person_enrollments_ready_job_max_number_of_messages + visibility_timeout = config.in_person_enrollments_ready_job_visibility_timeout_seconds + wait_time_seconds = config.in_person_enrollments_ready_job_wait_time_seconds + + # The queue will need to remain connected for at least the duration set + # by wait_time_seconds, which may conflict with aws_http_timeout. + # + # Adding the two together here to create a buffer. + http_read_timeout = config.aws_http_timeout + wait_time_seconds + + InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper.new( + sqs_client: Aws::SQS::Client.new( + http_read_timeout:, + ), + queue_url:, + receive_params: { + queue_url:, + max_number_of_messages:, + visibility_timeout:, + wait_time_seconds:, + }, + ) + end + end + + def analytics + @analytics ||= Analytics.new( + user: AnonymousUser.new, + request: nil, + session: {}, + sp: nil, + ) + end + end +end diff --git a/app/services/analytics_events.rb b/app/services/analytics_events.rb index 5d8db363b16..457a6dd8406 100644 --- a/app/services/analytics_events.rb +++ b/app/services/analytics_events.rb @@ -947,6 +947,62 @@ def idv_in_person_ready_to_verify_what_to_bring_link_clicked(**extra) ) end + # A job to check USPS notifications about in-person enrollment status updates has started + def idv_in_person_proofing_enrollments_ready_for_status_check_job_started(**extra) + track_event( + 'InPersonEnrollmentsReadyForStatusCheckJob: Job started', + **extra, + ) + end + + # A job to check USPS notifications about in-person enrollment status updates has completed + # @param [Integer] fetched_items items fetched + # @param [Integer] processed_items items fetched and processed + # @param [Integer] deleted_items items fetched, processed, and then deleted from the queue + # @param [Integer] valid_items items that could be successfully used to update a record + # @param [Integer] invalid_items items that couldn't be used to update a record + # @param [Integer] incomplete_items fetched items not processed nor deleted from the queue + # @param [Integer] deletion_failed_items processed items that we failed to delete + def idv_in_person_proofing_enrollments_ready_for_status_check_job_completed( + fetched_items:, + processed_items:, + deleted_items:, + valid_items:, + invalid_items:, + incomplete_items:, + deletion_failed_items:, + **extra + ) + track_event( + 'InPersonEnrollmentsReadyForStatusCheckJob: Job completed', + fetched_items:, + processed_items:, + deleted_items:, + valid_items:, + invalid_items:, + incomplete_items:, + deletion_failed_items:, + **extra, + ) + end + + # A job to check USPS notifications about in-person enrollment status updates + # has encountered an error + # @param [String] exception_class + # @param [String] exception_message + def idv_in_person_proofing_enrollments_ready_for_status_check_job_ingestion_error( + exception_class:, + exception_message:, + **extra + ) + track_event( + 'InPersonEnrollmentsReadyForStatusCheckJob: Ingestion error', + exception_class:, + exception_message:, + **extra, + ) + end + # User has consented to share information with document upload and may # view the "hybrid handoff" step next unless "skip_upload" param is true def idv_doc_auth_agreement_submitted(**extra) diff --git a/config/application.yml.default b/config/application.yml.default index 409ca9adcbe..baf209db88f 100644 --- a/config/application.yml.default +++ b/config/application.yml.default @@ -139,6 +139,13 @@ in_person_email_reminder_final_benchmark_in_days: 1 in_person_email_reminder_late_benchmark_in_days: 4 in_person_proofing_enabled: false in_person_enrollment_validity_in_days: 30 +in_person_enrollments_ready_job_email_body_pattern: '\A\s*(?\d{16})\s*\Z' +in_person_enrollments_ready_job_cron: '0/10 * * * *' +in_person_enrollments_ready_job_enabled: false +in_person_enrollments_ready_job_queue_url: '' +in_person_enrollments_ready_job_max_number_of_messages: 10 +in_person_enrollments_ready_job_visibility_timeout_seconds: 30 +in_person_enrollments_ready_job_wait_time_seconds: 20 in_person_results_delay_in_hours: 1 in_person_completion_survey_url: 'https://login.gov' in_person_usps_outage_message_enabled: false diff --git a/config/initializers/job_configurations.rb b/config/initializers/job_configurations.rb index 833f94744d9..675bb6c44ec 100644 --- a/config/initializers/job_configurations.rb +++ b/config/initializers/job_configurations.rb @@ -104,6 +104,12 @@ class: 'HeartbeatJob', cron: cron_5m, }, + # Queue usps in-person visit notifications job to GoodJob + in_person_enrollments_ready_for_status_check_job: { + class: 'InPerson::EnrollmentsReadyForStatusCheckJob', + cron: IdentityConfig.store.in_person_enrollments_ready_job_cron, + args: -> { [Time.zone.now] }, + }, # Queue usps proofing job to GoodJob get_usps_proofing_results_job: { class: 'GetUspsProofingResultsJob', diff --git a/db/primary_migrate/20230503231037_add_ready_for_status_check_to_in_person_enrollments.rb b/db/primary_migrate/20230503231037_add_ready_for_status_check_to_in_person_enrollments.rb new file mode 100644 index 00000000000..db8b2e0918c --- /dev/null +++ b/db/primary_migrate/20230503231037_add_ready_for_status_check_to_in_person_enrollments.rb @@ -0,0 +1,7 @@ +class AddReadyForStatusCheckToInPersonEnrollments < ActiveRecord::Migration[7.0] + disable_ddl_transaction! + def change + add_column :in_person_enrollments, :ready_for_status_check, :boolean, default: false + add_index :in_person_enrollments, :ready_for_status_check, where: "(ready_for_status_check = true)", algorithm: :concurrently + end +end diff --git a/db/schema.rb b/db/schema.rb index 6cd4ba7f485..0135afe0ba2 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.0].define(version: 2023_05_02_235856) do +ActiveRecord::Schema[7.0].define(version: 2023_05_03_231037) do # These are extensions that must be enabled in order to support this database enable_extension "pg_stat_statements" enable_extension "pgcrypto" @@ -308,7 +308,9 @@ t.datetime "proofed_at", precision: nil, comment: "timestamp when user attempted to proof at a Post Office" t.boolean "capture_secondary_id_enabled", default: false, comment: "record and proof state ID and residential addresses separately" t.datetime "status_check_completed_at", comment: "The last time a status check was successfully completed" + t.boolean "ready_for_status_check", default: false t.index ["profile_id"], name: "index_in_person_enrollments_on_profile_id" + t.index ["ready_for_status_check"], name: "index_in_person_enrollments_on_ready_for_status_check", where: "(ready_for_status_check = true)" t.index ["status_check_attempted_at"], name: "index_in_person_enrollments_on_status_check_attempted_at", where: "(status = 1)" t.index ["unique_id"], name: "index_in_person_enrollments_on_unique_id", unique: true t.index ["user_id", "status"], name: "index_in_person_enrollments_on_user_id_and_status", unique: true, where: "(status = 1)" diff --git a/lib/identity_config.rb b/lib/identity_config.rb index d8f59420682..e832cecdbf7 100644 --- a/lib/identity_config.rb +++ b/lib/identity_config.rb @@ -218,6 +218,13 @@ def self.build_store(config_map) config.add(:in_person_email_reminder_late_benchmark_in_days, type: :integer) config.add(:in_person_proofing_enabled, type: :boolean) config.add(:in_person_enrollment_validity_in_days, type: :integer) + config.add(:in_person_enrollments_ready_job_email_body_pattern, type: :string) + config.add(:in_person_enrollments_ready_job_cron, type: :string) + config.add(:in_person_enrollments_ready_job_enabled, type: :boolean) + config.add(:in_person_enrollments_ready_job_queue_url, type: :string) + config.add(:in_person_enrollments_ready_job_max_number_of_messages, type: :integer) + config.add(:in_person_enrollments_ready_job_visibility_timeout_seconds, type: :integer) + config.add(:in_person_enrollments_ready_job_wait_time_seconds, type: :integer) config.add(:in_person_results_delay_in_hours, type: :integer) config.add(:in_person_completion_survey_url, type: :string) config.add(:in_person_usps_outage_message_enabled, type: :boolean) diff --git a/spec/jobs/in_person/enrollments_ready_for_status_check/batch_processor_spec.rb b/spec/jobs/in_person/enrollments_ready_for_status_check/batch_processor_spec.rb new file mode 100644 index 00000000000..6eced3d0342 --- /dev/null +++ b/spec/jobs/in_person/enrollments_ready_for_status_check/batch_processor_spec.rb @@ -0,0 +1,229 @@ +require 'rails_helper' + +RSpec.describe InPerson::EnrollmentsReadyForStatusCheck::BatchProcessor do + let(:messages) { [] } + let(:analytics_stats) do + { + fetched_items: 0, + processed_items: 0, + deleted_items: 0, + valid_items: 0, + invalid_items: 0, + } + end + + let(:error_reporter) { instance_double(InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter) } + let(:sqs_batch_wrapper) do + instance_double(InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper) + end + let(:enrollment_pipeline) do + instance_double(InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline) + end + + subject(:batch_processor) do + described_class.new( + error_reporter:, + sqs_batch_wrapper:, + enrollment_pipeline:, + ) + end + + describe '#process_batch' do + let(:delete_result) { instance_double(Aws::SQS::Types::DeleteMessageBatchResult) } + + def successful_delete + instance_double(Aws::SQS::Types::DeleteMessageBatchResultEntry) + end + + def failed_delete + instance_double(Aws::SQS::Types::BatchResultErrorEntry) + end + + it 'ignores an empty batch' do + expect(enrollment_pipeline).not_to receive(:process_message) + expect(sqs_batch_wrapper).not_to receive(:delete_message_batch) + expect(error_reporter).not_to receive(:report_error) + expected_analytics_stats = analytics_stats.dup + batch_processor.process_batch(messages, analytics_stats) + expect(analytics_stats).to eq(expected_analytics_stats) + end + + context 'one message' do + let(:message) { instance_double(Aws::SQS::Types::Message) } + let(:messages) { [message] } + + it 'unhandled exception does not delete item' do + error = RuntimeError.new 'test error' + expect(enrollment_pipeline).to receive(:process_message).with(message).and_raise(error).once + expect(sqs_batch_wrapper).not_to receive(:delete_message_batch) + expect(error_reporter).not_to receive(:report_error) + expected_analytics_stats = { + **analytics_stats, + fetched_items: 1, + } + expect do + batch_processor.process_batch(messages, analytics_stats) + end.to raise_error(error) + expect(analytics_stats).to eq(expected_analytics_stats) + end + + it 'invalid item is marked as processed and deleted' do + expect(enrollment_pipeline).to receive(:process_message). + with(message).and_return(false).once + expect(sqs_batch_wrapper).to receive(:delete_message_batch). + with(messages).and_return(delete_result).once + expect(delete_result).to receive(:failed).and_return([]) + expect(delete_result).to receive(:successful).and_return(messages) + expect(error_reporter).not_to receive(:report_error) + expected_analytics_stats = { + **analytics_stats, + fetched_items: 1, + invalid_items: 1, + deleted_items: 1, + processed_items: 1, + } + batch_processor.process_batch(messages, analytics_stats) + expect(analytics_stats).to eq(expected_analytics_stats) + end + + it 'valid item is marked as processed and deleted' do + expect(enrollment_pipeline).to receive(:process_message). + with(message).and_return(true).once + expect(sqs_batch_wrapper).to receive(:delete_message_batch). + with(messages).and_return(delete_result).once + expect(delete_result).to receive(:failed).and_return([]) + expect(delete_result).to receive(:successful).and_return( + [ + successful_delete, + ], + ) + expect(error_reporter).not_to receive(:report_error) + expected_analytics_stats = { + **analytics_stats, + fetched_items: 1, + valid_items: 1, + deleted_items: 1, + processed_items: 1, + } + batch_processor.process_batch(messages, analytics_stats) + expect(analytics_stats).to eq(expected_analytics_stats) + end + + it 'item is marked as processed but fails to be deleted' do + expect(enrollment_pipeline).to receive(:process_message). + with(message).and_return(true).once + expect(sqs_batch_wrapper).to receive(:delete_message_batch). + with(messages).and_return(delete_result).once + error_entry = failed_delete + expect(delete_result).to receive(:failed).and_return( + [ + error_entry, + ], + ) + error_entry_hash = { + id: 123, + } + expect(error_entry).to receive(:to_h).and_return(error_entry_hash) + expect(delete_result).to receive(:successful).and_return([]) + expect(error_reporter).to receive(:report_error).with( + 'Failed to delete item from queue', + sqs_delete_error: error_entry_hash, + ).once + expected_analytics_stats = { + **analytics_stats, + fetched_items: 1, + valid_items: 1, + deleted_items: 0, + processed_items: 1, + } + batch_processor.process_batch(messages, analytics_stats) + expect(analytics_stats).to eq(expected_analytics_stats) + end + + it 'item is marked as processed but the batch delete call throws an error' do + error = RuntimeError.new 'test batch error' + expect(enrollment_pipeline).to receive(:process_message). + with(message).and_return(true).once + expect(sqs_batch_wrapper).to receive(:delete_message_batch). + with(messages).and_raise(error).once + expect(error_reporter).to receive(:report_error).with(error).once + expected_analytics_stats = { + **analytics_stats, + fetched_items: 1, + valid_items: 1, + deleted_items: 0, + processed_items: 1, + } + batch_processor.process_batch(messages, analytics_stats) + expect(analytics_stats).to eq(expected_analytics_stats) + end + end + + context 'multiple messages' do + let(:message) { instance_double(Aws::SQS::Types::Message) } + let(:messages) do + [ + instance_double(Aws::SQS::Types::Message), + instance_double(Aws::SQS::Types::Message), + instance_double(Aws::SQS::Types::Message), + instance_double(Aws::SQS::Types::Message), + instance_double(Aws::SQS::Types::Message), + ] + end + + it 'handles combined valid, invalid, and non-deleted messages' do + expect(enrollment_pipeline).to receive(:process_message).and_return( + true, + false, + true, + true, + true, + ).exactly(5).times + expect(sqs_batch_wrapper).to receive(:delete_message_batch). + with(messages).and_return(delete_result).once + + error_entry = failed_delete + error_entry2 = failed_delete + expect(delete_result).to receive(:failed).and_return( + [ + error_entry, + error_entry2, + ], + ) + error_entry_hash = { + id: 123, + } + error_entry_hash2 = { + id: 456, + } + expect(error_entry).to receive(:to_h).and_return(error_entry_hash) + expect(error_entry2).to receive(:to_h).and_return(error_entry_hash2) + expect(delete_result).to receive(:successful).and_return( + [ + successful_delete, + successful_delete, + successful_delete, + ], + ) + expect(error_reporter).to receive(:report_error).with( + 'Failed to delete item from queue', + sqs_delete_error: error_entry_hash, + ).once + expect(error_reporter).to receive(:report_error).with( + 'Failed to delete item from queue', + sqs_delete_error: error_entry_hash2, + ).once + expected_analytics_stats = { + **analytics_stats, + fetched_items: 5, + valid_items: 4, + invalid_items: 1, + deleted_items: 3, + processed_items: 5, + } + batch_processor.process_batch(messages, analytics_stats) + expect(analytics_stats).to eq(expected_analytics_stats) + end + end + end +end diff --git a/spec/jobs/in_person/enrollments_ready_for_status_check/enrollment_pipeline_spec.rb b/spec/jobs/in_person/enrollments_ready_for_status_check/enrollment_pipeline_spec.rb new file mode 100644 index 00000000000..e8df2c9ebb4 --- /dev/null +++ b/spec/jobs/in_person/enrollments_ready_for_status_check/enrollment_pipeline_spec.rb @@ -0,0 +1,335 @@ +require 'rails_helper' + +RSpec.describe InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline do + let(:error_reporter) { instance_double(InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter) } + let(:email_body_pattern) { /\A\s*(?\d{16})\s*\Z/ } + subject(:enrollment_pipeline) { described_class.new(error_reporter:, email_body_pattern:) } + + before(:each) do + allow(IdentityConfig.store).to receive(:in_person_enrollments_ready_job_email_body_pattern). + and_return('\A\s*(?\d{16})\s*\Z') + + allow(error_reporter).to receive(:report_error) + end + + describe '#process_message' do + let(:sqs_message) { instance_double(Aws::SQS::Types::Message) } + let(:sqs_message_id) { Random.uuid } + let(:sns_message_id) { Random.uuid } + let(:enrollment_code) { 16.times.map { rand(0..9) }.join } + let(:user) { create(:user) } + let(:user_id) { user.id } + let(:mail_date) { 16.hours.ago.to_datetime } + let(:ses_payload) do + { + content: Mail.new do |m| + m.text_part = enrollment_code + end.to_s, + mail: { + messageId: Random.uuid.delete('-'), + timestamp: DateTime.now.to_s, + source: 'testsource@example.com', + commonHeaders: { + date: Mail::DateField.new(mail_date).to_s, + messageId: Mail::Utilities.generate_message_id, + }, + }, + } + end + let(:logged_ses_values) do + { + ses_aws_message_id: ses_payload[:mail][:messageId], + ses_mail_source: ses_payload[:mail][:source], + ses_mail_timestamp: ses_payload[:mail][:timestamp], + ses_rfc_message_id: ses_payload[:mail][:commonHeaders][:messageId], + ses_rfc_origination_date: mail_date.to_s, + } + end + let(:sns_payload) do + { + MessageId: sns_message_id, + Message: ses_payload.to_json, + } + end + let(:expected_error) { nil } + let(:expected_error_extra) { nil } + + before(:each) do + allow(sqs_message).to receive(:message_id). + and_return(sqs_message_id) + end + + def expect_error(error, **extra) + expect(error_reporter).to receive(:report_error) do |err, err_extra| + expect(err).to eq(error) if error.is_a?(String) + expect(err.message).to eq(error.message) if error.is_a?(StandardError) + expect(err.class).to eq(error.class) + expect(err_extra).to eq(extra) + end + expect(enrollment_pipeline.process_message(sqs_message)).to be(false) + end + + context 'reports error and returns false' do + it 'SQS message is not JSON' do + allow(sqs_message).to receive(:body).and_return('not json') + expect_error(JSON::ParserError.new("unexpected token at 'not json'"), sqs_message_id:) + end + + it 'SQS message body is not a hash' do + allow(sqs_message).to receive(:body).and_return('abcd'.to_json) + expect_error('SQS message body is not valid SNS payload', sqs_message_id:) + end + + it 'SNS message is missing MessageId' do + allow(sqs_message).to receive(:body).and_return( + { + Message: 'abcd', + }.to_json, + ) + expect_error('SQS message body is not valid SNS payload', sqs_message_id:) + end + + it 'SNS message is missing Message' do + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + }.to_json, + ) + expect_error('SQS message body is not valid SNS payload', sqs_message_id:) + end + + it 'SNS "Message" field is not JSON' do + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + Message: 'not json', + }.to_json, + ) + expect_error( + JSON::ParserError.new("unexpected token at 'not json'"), sqs_message_id:, + sns_message_id: + ) + end + + it 'SES message is not a hash' do + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + Message: 'abcd'.to_json, + }.to_json, + ) + expect_error( + 'SNS "Message" field is not a valid SES payload', sqs_message_id:, + sns_message_id: + ) + end + + it 'SES message is missing "content" key' do + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + Message: { + mail: {}, + }.to_json, + }.to_json, + ) + expect_error( + 'SNS "Message" field is not a valid SES payload', sqs_message_id:, + sns_message_id: + ) + end + + it 'SES message is missing "mail" key' do + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + Message: { + content: 'abcd', + }.to_json, + }.to_json, + ) + expect_error( + 'SNS "Message" field is not a valid SES payload', + sqs_message_id:, + sns_message_id:, + ) + end + + it 'email content key is missing' do + payload = ses_payload + payload.delete(:content) + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + Message: { + **payload, + }.to_json, + }.to_json, + ) + expect_error( + 'SNS "Message" field is not a valid SES payload', sqs_message_id:, + sns_message_id: + ) + end + + it 'email body is missing (single part)' do + message = Mail.new + expect(message.multipart?).to be(false) + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + Message: { + **ses_payload, + content: message.to_s, + }.to_json, + }.to_json, + ) + expect_error( + 'Failure occurred when attempting to get email body', + sqs_message_id:, + sns_message_id:, + **logged_ses_values, + ) + end + + it 'email body is missing (multipart)' do + message = Mail.new do + html_part do + body nil + end + text_part do + body nil + end + end + expect(message.multipart?).to be(true) + allow(sqs_message).to receive(:body).and_return( + { + MessageId: sns_message_id, + Message: { + **ses_payload, + content: message.to_s, + }.to_json, + }.to_json, + ) + expect_error( + 'Failure occurred when attempting to get email body', + sqs_message_id:, + sns_message_id:, + **logged_ses_values, + ) + end + + it 'email body does not match pattern' do + allow(sqs_message).to receive(:body).and_return( + { + **sns_payload, + Message: { + **ses_payload, + content: Mail.new do |m| + m.text_part = 'abcd' + end.to_s, + }.to_json, + }.to_json, + ) + expect_error( + 'Failed to extract enrollment code using regex, check email body format and regex', + sqs_message_id:, + sns_message_id:, + **logged_ses_values, + ) + end + + it 'enrollment does not exist' do + allow(sqs_message).to receive(:body).and_return(sns_payload.to_json) + expect_error( + 'Received code for enrollment that does not exist in the database', + sqs_message_id:, + sns_message_id:, + enrollment_code:, + **logged_ses_values, + ) + end + end + + context 'reports and rethrows unhandled errors' do + it 'error thrown trying to fetch enrollment' do + allow(sqs_message).to receive(:body).and_return(sns_payload.to_json) + error = ActiveRecord::ConnectionNotEstablished.new + expect(InPersonEnrollment).to receive_message_chain( + :where, + :order, + :limit, + :pick, + ).and_raise(error) + + expect(error_reporter).to receive(:report_error). + with( + error, + sqs_message_id:, + sns_message_id:, + enrollment_code:, + **logged_ses_values, + ) + + expect do + enrollment_pipeline.process_message(sqs_message) + end.to raise_error(error) + end + + it 'error thrown trying to update enrollment' do + allow(sqs_message).to receive(:body).and_return(sns_payload.to_json) + + enrollment = create(:in_person_enrollment, enrollment_code:, status: :pending, user:) + + error = ActiveRecord::ConnectionNotEstablished.new + expect(InPersonEnrollment).to receive(:update). + with(enrollment.id, ready_for_status_check: true). + and_raise(error) + + expect(error_reporter).to receive(:report_error). + with( + error, + sqs_message_id:, + sns_message_id:, + enrollment_code:, + user_id:, + enrollment_id: enrollment.id, + **logged_ses_values, + ) + + expect do + enrollment_pipeline.process_message(sqs_message) + end.to raise_error(error) + end + end + + context 'returns true for records handled as expected' do + it 'marks non-ready record as ready' do + allow(sqs_message).to receive(:body).and_return(sns_payload.to_json) + + enrollment = create(:in_person_enrollment, enrollment_code:, status: :pending, user:) + + expect(InPersonEnrollment).to receive(:update). + with(enrollment.id, ready_for_status_check: true).once + + expect(error_reporter).not_to receive(:report_error) + + expect(enrollment_pipeline.process_message(sqs_message)).to be(true) + end + it 'leaves record already marked as ready' do + allow(sqs_message).to receive(:body).and_return(sns_payload.to_json) + + create( + :in_person_enrollment, enrollment_code:, status: :pending, user:, + ready_for_status_check: true + ) + + expect(InPersonEnrollment).not_to receive(:update) + + expect(error_reporter).not_to receive(:report_error) + + expect(enrollment_pipeline.process_message(sqs_message)).to be(true) + end + end + end +end diff --git a/spec/jobs/in_person/enrollments_ready_for_status_check/error_reporter_spec.rb b/spec/jobs/in_person/enrollments_ready_for_status_check/error_reporter_spec.rb new file mode 100644 index 00000000000..1c5b4a9707e --- /dev/null +++ b/spec/jobs/in_person/enrollments_ready_for_status_check/error_reporter_spec.rb @@ -0,0 +1,102 @@ +require 'rails_helper' + +RSpec.describe InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter do + let(:class_name_suffix) { "TestClass#{[*('A'..'Z'), *('a'..'z')].sample(10).join}" } + + # We need the class name since it's part of what we're logging + let(:class_name) { "#{described_class.name.demodulize}#{class_name_suffix}" } + + let(:analytics) { FakeAnalytics.new } + subject(:error_reporter) { described_class.new(class_name, analytics) } + let(:analytics_extra) { nil } + let(:expected_error_class) { nil } + let(:expected_message) { nil } + + before(:each) do + allow(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_ingestion_error, + ) + allow(NewRelic::Agent).to receive(:notice_error) + end + + def it_generates_and_records_the_error + expect(analytics).to have_received( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_ingestion_error, + ).once do |exception_class:, exception_message:, **extra| + expect(exception_class).to eq(expected_error_class) + expect(exception_message).to eq(expected_message) + end + expect(NewRelic::Agent).to have_received(:notice_error).once do |error| + expect(error).to be_instance_of(expected_error_class) + expect(error.message).to eq(expected_message) + end + end + + def it_passes_expected_attributes_to_analytics + expect(analytics).to have_received( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_ingestion_error, + ).once do |exception_class:, exception_message:, **extra| + expect(extra).to eq(analytics_extra) + end + end + + describe '#report_error' do + context 'given a string message' do + let(:string_message) { 'my string message here' } + let(:expected_error_class) { RuntimeError } + let(:expected_message) { "#{class_name}: #{string_message}" } + let(:analytics_extra) { {} } + + before(:each) do + error_reporter.report_error(string_message, **analytics_extra) + end + + it 'generates an error object and records the error' do + it_generates_and_records_the_error + end + it 'passes the expected data to analytics' do + it_passes_expected_attributes_to_analytics + end + + context 'with extra analytics data' do + let(:analytics_extra) { { test: 'abcd' } } + + it 'generates an error object and records the error' do + it_generates_and_records_the_error + end + it 'passes the expected data to analytics' do + it_passes_expected_attributes_to_analytics + end + end + end + + context 'given an error' do + let(:error) { ArgumentError.new(expected_message) } + let(:expected_error_class) { ArgumentError } + let(:expected_message) { 'my test message' } + let(:analytics_extra) { {} } + + before(:each) do + error_reporter.report_error(error, **analytics_extra) + end + + it 'generates an error object and records the error' do + it_generates_and_records_the_error + end + it 'passes the expected data to analytics' do + it_passes_expected_attributes_to_analytics + end + + context 'with extra analytics data' do + let(:analytics_extra) { { test: 'abcd' } } + + it 'generates an error object and records the error' do + it_generates_and_records_the_error + end + it 'passes the expected data to analytics' do + it_passes_expected_attributes_to_analytics + end + end + end + end +end diff --git a/spec/jobs/in_person/enrollments_ready_for_status_check/sqs_batch_wrapper_spec.rb b/spec/jobs/in_person/enrollments_ready_for_status_check/sqs_batch_wrapper_spec.rb new file mode 100644 index 00000000000..f9ae2baf538 --- /dev/null +++ b/spec/jobs/in_person/enrollments_ready_for_status_check/sqs_batch_wrapper_spec.rb @@ -0,0 +1,91 @@ +require 'rails_helper' + +RSpec.describe InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper do + let(:queue_url) { 'my/test/queue/url' } + let(:sqs_client) { instance_double(Aws::SQS::Client) } + let(:receive_params) do + { + queue_url:, + max_number_of_messages: 10, + visibility_timeout: 30, + wait_time_seconds: 20, + } + end + subject(:sqs_batch_wrapper) { described_class.new(sqs_client:, queue_url:, receive_params:) } + + def create_mock_message + instance_double(Aws::SQS::Types::Message) + end + + describe '#poll' do + it 'polls SQS and returns messages' do + mock_result = instance_double(Aws::SQS::Types::ReceiveMessageResult) + mock_messages = [ + create_mock_message, + create_mock_message, + create_mock_message, + ] + + expect(sqs_client).to receive(:receive_message). + with(receive_params). + and_return(mock_result) + + expect(mock_result).to receive(:messages).and_return(mock_messages) + expect(sqs_batch_wrapper.poll).to eq(mock_messages) + end + end + + describe '#delete_message_batch' do + it 'deletes a batch of 1 from SQS' do + message_id = Random.uuid + receipt_handle = Random.uuid + message = create_mock_message + allow(message).to receive(:message_id).and_return(message_id) + allow(message).to receive(:receipt_handle).and_return(receipt_handle) + deletion_result = instance_double(Aws::SQS::Types::DeleteMessageBatchResult) + expect(sqs_client).to receive(:delete_message_batch).with( + { + queue_url:, + entries: [ + { + id: message_id, + receipt_handle: receipt_handle, + }, + ], + }, + ).and_return(deletion_result) + expect(sqs_batch_wrapper.delete_message_batch([message])).to be(deletion_result) + end + it 'deletes a batch of 2 from SQS' do + message_id = Random.uuid + receipt_handle = Random.uuid + message = create_mock_message + allow(message).to receive(:message_id).and_return(message_id) + allow(message).to receive(:receipt_handle).and_return(receipt_handle) + + message_id2 = Random.uuid + receipt_handle2 = Random.uuid + message2 = create_mock_message + allow(message2).to receive(:message_id).and_return(message_id2) + allow(message2).to receive(:receipt_handle).and_return(receipt_handle2) + + deletion_result = instance_double(Aws::SQS::Types::DeleteMessageBatchResult) + expect(sqs_client).to receive(:delete_message_batch).with( + { + queue_url:, + entries: [ + { + id: message_id, + receipt_handle: receipt_handle, + }, + { + id: message_id2, + receipt_handle: receipt_handle2, + }, + ], + }, + ).and_return(deletion_result) + expect(sqs_batch_wrapper.delete_message_batch([message, message2])).to be(deletion_result) + end + end +end diff --git a/spec/jobs/in_person/enrollments_ready_for_status_check_job_spec.rb b/spec/jobs/in_person/enrollments_ready_for_status_check_job_spec.rb new file mode 100644 index 00000000000..429268793e7 --- /dev/null +++ b/spec/jobs/in_person/enrollments_ready_for_status_check_job_spec.rb @@ -0,0 +1,370 @@ +require 'rails_helper' + +RSpec.describe InPerson::EnrollmentsReadyForStatusCheckJob do + let(:in_person_proofing_enabled) { nil } + let(:in_person_enrollments_ready_job_enabled) { nil } + let(:analytics) { FakeAnalytics.new } + subject(:job) { described_class.new } + + describe '#perform' do + before(:each) do + allow(job).to receive(:analytics).and_return(analytics) + allow(IdentityConfig.store).to receive(:in_person_proofing_enabled). + and_return(in_person_proofing_enabled) + allow(IdentityConfig.store).to receive(:in_person_enrollments_ready_job_enabled). + and_return(in_person_enrollments_ready_job_enabled) + end + + def process_batch_result + { + fetched_items: 7, + processed_items: 7, + deleted_items: 4, + valid_items: 5, + invalid_items: 2, + } + end + + def new_message + instance_double(Aws::SQS::Types::Message) + end + + context 'in person proofing disabled' do + let(:in_person_proofing_enabled) { false } + let(:in_person_enrollments_ready_job_enabled) { true } + it 'returns true without doing anything' do + expect(analytics).not_to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_started, + ) + expect(analytics).not_to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_completed, + ) + expect(job).not_to receive(:poll) + expect(job).not_to receive(:process_batch) + expect(job.perform(Time.zone.now)).to be(true) + end + end + + context 'job disabled' do + let(:in_person_proofing_enabled) { true } + let(:in_person_enrollments_ready_job_enabled) { false } + it 'returns true without doing anything' do + expect(analytics).not_to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_started, + ) + expect(analytics).not_to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_completed, + ) + expect(job).not_to receive(:poll) + expect(job).not_to receive(:process_batch) + expect(job.perform(Time.zone.now)).to be(true) + end + end + + context 'in person proofing and job enabled' do + let(:in_person_proofing_enabled) { true } + let(:in_person_enrollments_ready_job_enabled) { true } + + it 'logs analytics for a run with zero batches' do + expect(job).to receive(:poll).and_return([]) + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_started, + ) + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_completed, + ).with( + fetched_items: 0, + processed_items: 0, + deleted_items: 0, + valid_items: 0, + invalid_items: 0, + incomplete_items: 0, + deletion_failed_items: 0, + ) + expect(job).not_to receive(:process_batch) + expect(job.perform(Time.zone.now)).to be(true) + end + + it 'logs analytics for a run with one batch' do + batch = [new_message] + expect(job).to receive(:poll).and_return(batch, []) + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_started, + ) + expect(job).to receive(:process_batch).with( + batch, + { + fetched_items: 0, + processed_items: 0, + deleted_items: 0, + valid_items: 0, + invalid_items: 0, + }, + ) do |_batch, analytics_stats| + analytics_stats.merge!(process_batch_result) + end.once + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_completed, + ).with( + fetched_items: 7, + processed_items: 7, + deleted_items: 4, + valid_items: 5, + invalid_items: 2, + incomplete_items: 0, + deletion_failed_items: 3, + ) + expect(job.perform(Time.zone.now)).to be(true) + end + + it 'logs analytics for a run with one batch that throws an error' do + batch = [new_message] + expect(job).to receive(:poll).and_return(batch) + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_started, + ) + error = RuntimeError.new('test error') + expect(job).to receive(:process_batch).with( + batch, + an_instance_of(Hash), + ) do |_batch, analytics_stats| + analytics_stats.merge!(process_batch_result) + raise error + end.once + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_completed, + ).with( + fetched_items: 7, + processed_items: 7, + deleted_items: 4, + valid_items: 5, + invalid_items: 2, + incomplete_items: 0, + deletion_failed_items: 3, + ) + expect { job.perform(Time.zone.now) }.to raise_error(error) + end + + it 'logs analytics for a run with three batches' do + batch = [new_message] + batch2 = [new_message] + batch3 = [new_message] + expect(job).to receive(:poll).and_return(batch, batch2, batch3, []) + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_started, + ) + expect(job).to receive(:process_batch) do |current_batch, analytics_stats| + if batch == current_batch + process_batch_result.each do |key, value| + analytics_stats[key] = value + analytics_stats[key] + end + elsif current_batch == batch2 || current_batch == batch3 + { + fetched_items: 3, + processed_items: 3, + deleted_items: 2, + valid_items: 3, + invalid_items: 0, + }.each do |key, value| + analytics_stats[key] = value + analytics_stats[key] + end + end + end.exactly(3).times + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_completed, + ).with( + fetched_items: 13, + processed_items: 13, + deleted_items: 8, + valid_items: 11, + invalid_items: 2, + incomplete_items: 0, + deletion_failed_items: 5, + ) + expect(job.perform(Time.zone.now)).to be(true) + end + + it 'logs analytics for a run with three batches with error' do + batch = [new_message] + batch2 = [new_message] + batch3 = [new_message] + expect(job).to receive(:poll).and_return(batch, batch2, batch3) + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_started, + ) + error = RuntimeError.new('test error') + expect(job).to receive(:process_batch) do |current_batch, analytics_stats| + if current_batch == batch + process_batch_result.each do |key, value| + analytics_stats[key] = value + analytics_stats[key] + end + elsif current_batch == batch3 + { + fetched_items: 3, + processed_items: 1, + deleted_items: 1, + valid_items: 1, + invalid_items: 0, + }.each do |key, value| + analytics_stats[key] = value + analytics_stats[key] + end + raise error + elsif current_batch == batch2 + { + fetched_items: 3, + processed_items: 3, + deleted_items: 2, + valid_items: 3, + invalid_items: 0, + }.each do |key, value| + analytics_stats[key] = value + analytics_stats[key] + end + end + end.exactly(3).times + expect(analytics).to receive( + :idv_in_person_proofing_enrollments_ready_for_status_check_job_completed, + ).with( + fetched_items: 13, + processed_items: 11, + deleted_items: 7, + valid_items: 9, + invalid_items: 2, + incomplete_items: 2, + deletion_failed_items: 4, + ) + expect { job.perform(Time.zone.now) }.to raise_error(error) + end + end + end + + # Normally we should stick to validating the contract and dependencies, + # but making an exception here because the construction of these classes + # is relatively complex; so expanding tests to additionally cover delegation + # and constructor calls. + # + # Also doing this b/c the codebase does not use an IoC framework like dry-system + # and there's not an established convention for creating factories. + + describe '#poll (private)' do + it 'delegates to sqs_batch_wrapper' do + sqs_batch_wrapper = instance_double(InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper) + poll_result = [] + expect(job).to receive(:sqs_batch_wrapper).and_return(sqs_batch_wrapper) + expect(sqs_batch_wrapper).to receive(:poll).and_return(poll_result).once + expect(job.send(:poll)).to be(poll_result) + end + end + + describe '#process_batch (private)' do + it 'delegates to batch_processor' do + batch_processor = instance_double(InPerson::EnrollmentsReadyForStatusCheck::BatchProcessor) + expect(job).to receive(:batch_processor).and_return(batch_processor) + messages = [] + analytics_stats = {} + expect(batch_processor).to receive(:process_batch).with(messages, analytics_stats).once + job.send(:process_batch, messages, analytics_stats) + end + end + + describe '#analytics (private)' do + it 'creates an analytics object' do + analytics = FakeAnalytics.new + expect(Analytics).to receive(:new).with( + user: instance_of(AnonymousUser), + request: nil, + session: {}, + sp: nil, + ).and_return(analytics) + expect(job.send(:analytics)).to be(analytics) + end + end + + describe '#sqs_batch_wrapper (private)' do + it 'creates SQS batch wrapper object with expected params' do + sqs_client = instance_double(Aws::SQS::Client) + + queue_url = 'test/queue/url' + max_number_of_messages = 10 + visibility_timeout_seconds = 30 + wait_time_seconds = 20 + aws_http_timeout = 5 + + expect(Aws::SQS::Client).to receive(:new). + with(http_read_timeout: wait_time_seconds + aws_http_timeout). + and_return(sqs_client) + + expect(IdentityConfig.store).to receive_messages( + aws_http_timeout:, + in_person_enrollments_ready_job_queue_url: queue_url, + in_person_enrollments_ready_job_max_number_of_messages: max_number_of_messages, + in_person_enrollments_ready_job_visibility_timeout_seconds: visibility_timeout_seconds, + in_person_enrollments_ready_job_wait_time_seconds: wait_time_seconds, + ) + + wrapper = instance_double(InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper) + expect(InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper).to receive(:new). + with( + sqs_client: sqs_client, + queue_url:, + receive_params: { + queue_url:, + max_number_of_messages:, + visibility_timeout: visibility_timeout_seconds, + wait_time_seconds:, + }, + ).and_return(wrapper) + expect(job.send(:sqs_batch_wrapper)).to be(wrapper) + end + end + + describe '#batch_processor (private)' do + it 'creates a batch processor with the expected arguments' do + analytics = FakeAnalytics.new + expect(job).to receive(:analytics).and_return(analytics).exactly(2).times + + batch_processor_error_reporter = instance_double( + InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter, + ) + expect(InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter).to receive(:new). + with( + InPerson::EnrollmentsReadyForStatusCheck::BatchProcessor.name, + analytics, + ).and_return(batch_processor_error_reporter) + + sqs_batch_wrapper = instance_double(InPerson::EnrollmentsReadyForStatusCheck::SqsBatchWrapper) + expect(job).to receive(:sqs_batch_wrapper).and_return(sqs_batch_wrapper) + + enrollment_pipeline_error_reporter = instance_double( + InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter, + ) + expect(InPerson::EnrollmentsReadyForStatusCheck::ErrorReporter).to receive(:new). + with( + InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline.name, + analytics, + ).and_return(enrollment_pipeline_error_reporter) + + email_body_pattern = 'abcd' + expect(IdentityConfig.store).to receive(:in_person_enrollments_ready_job_email_body_pattern). + and_return(email_body_pattern) + + enrollment_pipeline = instance_double( + InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline, + ) + expect(InPerson::EnrollmentsReadyForStatusCheck::EnrollmentPipeline).to receive(:new). + with( + error_reporter: enrollment_pipeline_error_reporter, + email_body_pattern: /abcd/, + ).and_return(enrollment_pipeline) + + batch_processor = instance_double(InPerson::EnrollmentsReadyForStatusCheck::BatchProcessor) + expect(InPerson::EnrollmentsReadyForStatusCheck::BatchProcessor).to receive(:new). + with( + error_reporter: batch_processor_error_reporter, + sqs_batch_wrapper:, + enrollment_pipeline:, + ).and_return(batch_processor) + + expect(job.send(:batch_processor)).to be(batch_processor) + end + end +end