From acb9f16d2c545c788e03bf6aad2c6e07113770f7 Mon Sep 17 00:00:00 2001 From: Mat Moore Date: Mon, 21 Aug 2017 11:56:08 +0000 Subject: [PATCH] Update requeue task to requeue by document type This task was added when Finding Things team migrated all the links to the publishing api, to sync everything back to Rummager. We now need something similar to synchronise all existing content, so that Rummager can be a direct downstream consumer of publishing api. This includes a few implementation changes: 1. Use non persistent messages, because we want requeues to be as fast and low overhead as possible. Requeued messages are less important than normal publishing messages and should be abandoned if RabbitMQ goes down. 2. Use a separate routing key - `*.bulk.reindex` - to indicate the intent (see removed comment). This allows us to define a separate non-durable queue for these messages in Rummager; currently the `rummager_govuk_index` receives everything and is set up as `durable` to survive server restarts. 3. Filter on `content_store` instead of `state`, because we care about unpublished (especially withdrawn) editions as well as published. 4. Filter by document type for now, as we don't need the whole lot yet. We may add a 'requeue everything' task back in later. --- lib/queue_publisher.rb | 4 +-- lib/requeue_content.rb | 46 ++++++++++++++------------------ lib/tasks/queue.rake | 14 +++++++--- spec/lib/requeue_content_spec.rb | 20 +++++++------- 4 files changed, 43 insertions(+), 41 deletions(-) diff --git a/lib/queue_publisher.rb b/lib/queue_publisher.rb index 8e4a8e603e..de5bf41c80 100644 --- a/lib/queue_publisher.rb +++ b/lib/queue_publisher.rb @@ -18,10 +18,10 @@ def connection class PublishFailedError < StandardError end - def send_message(edition, event_type: nil, routing_key: nil) + def send_message(edition, event_type: nil, routing_key: nil, persistent: true) return if @noop routing_key ||= routing_key(edition, event_type) - publish_message(routing_key, edition, content_type: 'application/json', persistent: true) + publish_message(routing_key, edition, content_type: 'application/json', persistent: persistent) end def routing_key(edition, event_type) diff --git a/lib/requeue_content.rb b/lib/requeue_content.rb index 24e1884133..674fca3217 100644 --- a/lib/requeue_content.rb +++ b/lib/requeue_content.rb @@ -1,39 +1,33 @@ class RequeueContent - def initialize(number_of_items: nil) - @number_of_items = number_of_items + def initialize(scope) + # Restrict scope to stuff that's live (published or unpublished) + # Unpublished content without a content store representation won't + # be returned, but we're not interested in this content. + @scope = scope.where(content_store: :live) + @version = Event.maximum(:id) end - attr_accessor :number_of_items - def call - if number_of_items.present? - Edition.where(state: :published).limit(number_of_items).each do |edition| - publish_to_queue(edition) - end - else - Edition.where(state: :published).find_each do |edition| - publish_to_queue(edition) - end + scope.each do |edition| + publish_to_queue(edition) end end private - def publish_to_queue(edition) - version = Event.maximum(:id) - - queue_payload = Presenters::EditionPresenter.new( - edition, draft: false, - ).for_message_queue(version) + attr_reader :scope, :version - # FIXME: Rummager currently only listens to the message queue for the - # event type 'links'. This behaviour will eventually be updated so that - # it listens to other update types as well. This will happen as part of - # ongoing architectural work to make the message queue the sole source of - # search index updates. When that happens, the event_type below should - # be changed - perhaps to a newly introduced, more-appropriately named - # one. Maybe something like 'reindex'. + def publish_to_queue(edition) + presenter = DownstreamPayload.new(edition, version, draft: false) + queue_payload = presenter.message_queue_payload + service = PublishingAPI.service(:queue_publisher) - PublishingAPI.service(:queue_publisher).send_message(queue_payload, event_type: "links") + # Requeue is considered a different event_type to major, minor etc + # because we don't want to send additional email alerts to users. + service.send_message( + queue_payload, + routing_key: "#{edition.schema_name}.bulk.reindex", + persistent: false + ) end end diff --git a/lib/tasks/queue.rake b/lib/tasks/queue.rake index 519c72af80..43c0f8d5e1 100644 --- a/lib/tasks/queue.rake +++ b/lib/tasks/queue.rake @@ -28,8 +28,16 @@ namespace :queue do end end - desc "Add published editions to the message queue, optionally specifying a limit on the number of items" - task :requeue_content, [:number_of_items] => :environment do |_, args| - RequeueContent.new(number_of_items: args[:number_of_items]).call + desc "Add published editions to the message queue by document type" + task :requeue_document_type, [:document_type] => :environment do |_, args| + document_type = args[:document_type] + raise ValueError("expecting document_type") unless document_type.present? + + scope = Edition + .with_document + .with_unpublishing + .where(document_type: document_type) + + RequeueContent.new(scope).call end end diff --git a/spec/lib/requeue_content_spec.rb b/spec/lib/requeue_content_spec.rb index d2c53157c2..da2e2e7b25 100644 --- a/spec/lib/requeue_content_spec.rb +++ b/spec/lib/requeue_content_spec.rb @@ -5,20 +5,20 @@ FactoryGirl.create(:live_edition, base_path: '/ci1') FactoryGirl.create(:live_edition, base_path: '/ci2') FactoryGirl.create(:live_edition, base_path: '/ci3') + FactoryGirl.create(:gone_live_edition, base_path: '/ci4') + FactoryGirl.create(:redirect_live_edition, base_path: '/ci5') + FactoryGirl.create(:draft_edition, base_path: '/ci5') end describe "#call" do - it "by default, it republishes all editions" do - expect(PublishingAPI.service(:queue_publisher)).to receive(:send_message).exactly(3).times - RequeueContent.new.call - end + it "it republishes all live editions" do + scope = Edition + .with_document + .with_unpublishing + + expect(PublishingAPI.service(:queue_publisher)).to receive(:send_message).exactly(5).times - it "limits the number of items published, if a limit is provided" do - expect(PublishingAPI.service(:queue_publisher)).to receive(:send_message) - .exactly(1) - .times - .with(a_hash_including(:content_id), event_type: "links") - RequeueContent.new(number_of_items: 1).call + RequeueContent.new(scope).call end end end