Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/queue_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def connection
class PublishFailedError < StandardError
end

def send_message(edition, event_type: nil, routing_key: nil)
def send_message(edition, event_type: nil, routing_key: nil, persistent: true)
return if @noop
routing_key ||= routing_key(edition, event_type)
publish_message(routing_key, edition, content_type: 'application/json', persistent: true)
publish_message(routing_key, edition, content_type: 'application/json', persistent: persistent)
end

def routing_key(edition, event_type)
Expand Down
46 changes: 20 additions & 26 deletions lib/requeue_content.rb
Original file line number Diff line number Diff line change
@@ -1,39 +1,33 @@
class RequeueContent
def initialize(number_of_items: nil)
@number_of_items = number_of_items
def initialize(scope)
# Restrict scope to stuff that's live (published or unpublished)
# Unpublished content without a content store representation won't
# be returned, but we're not interested in this content.
@scope = scope.where(content_store: :live)
@version = Event.maximum(:id)
end

attr_accessor :number_of_items

def call
if number_of_items.present?
Edition.where(state: :published).limit(number_of_items).each do |edition|
publish_to_queue(edition)
end
else
Edition.where(state: :published).find_each do |edition|
publish_to_queue(edition)
end
scope.each do |edition|
publish_to_queue(edition)
end
end

private

def publish_to_queue(edition)
version = Event.maximum(:id)

queue_payload = Presenters::EditionPresenter.new(
edition, draft: false,
).for_message_queue(version)
attr_reader :scope, :version

# FIXME: Rummager currently only listens to the message queue for the
# event type 'links'. This behaviour will eventually be updated so that
# it listens to other update types as well. This will happen as part of
# ongoing architectural work to make the message queue the sole source of
# search index updates. When that happens, the event_type below should
# be changed - perhaps to a newly introduced, more-appropriately named
# one. Maybe something like 'reindex'.
def publish_to_queue(edition)
presenter = DownstreamPayload.new(edition, version, draft: false)
queue_payload = presenter.message_queue_payload
service = PublishingAPI.service(:queue_publisher)

PublishingAPI.service(:queue_publisher).send_message(queue_payload, event_type: "links")
# Requeue is considered a different event_type to major, minor etc
# because we don't want to send additional email alerts to users.
service.send_message(
queue_payload,
routing_key: "#{edition.schema_name}.bulk.reindex",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if it's possible to do event_type: "bulk.reindex" here? AFAIK it's just string interpolation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so but I thought this was less confusing as you can see what the key looks like all in one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see what you mean. It's annoying having to include the duplication of edition.schema_name though. It's up to you, I'll approve the PR since it's not major.

persistent: false
)
end
end
14 changes: 11 additions & 3 deletions lib/tasks/queue.rake
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ namespace :queue do
end
end

desc "Add published editions to the message queue, optionally specifying a limit on the number of items"
task :requeue_content, [:number_of_items] => :environment do |_, args|
RequeueContent.new(number_of_items: args[:number_of_items]).call
desc "Add published editions to the message queue by document type"
task :requeue_document_type, [:document_type] => :environment do |_, args|
document_type = args[:document_type]
raise ValueError("expecting document_type") unless document_type.present?

scope = Edition
.with_document
.with_unpublishing
.where(document_type: document_type)

RequeueContent.new(scope).call
end
end
20 changes: 10 additions & 10 deletions spec/lib/requeue_content_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@
FactoryGirl.create(:live_edition, base_path: '/ci1')
FactoryGirl.create(:live_edition, base_path: '/ci2')
FactoryGirl.create(:live_edition, base_path: '/ci3')
FactoryGirl.create(:gone_live_edition, base_path: '/ci4')
FactoryGirl.create(:redirect_live_edition, base_path: '/ci5')
FactoryGirl.create(:draft_edition, base_path: '/ci5')
end

describe "#call" do
it "by default, it republishes all editions" do
expect(PublishingAPI.service(:queue_publisher)).to receive(:send_message).exactly(3).times
RequeueContent.new.call
end
it "it republishes all live editions" do
scope = Edition
.with_document
.with_unpublishing

expect(PublishingAPI.service(:queue_publisher)).to receive(:send_message).exactly(5).times

it "limits the number of items published, if a limit is provided" do
expect(PublishingAPI.service(:queue_publisher)).to receive(:send_message)
.exactly(1)
.times
.with(a_hash_including(:content_id), event_type: "links")
RequeueContent.new(number_of_items: 1).call
RequeueContent.new(scope).call
end
end
end