From 9e9c183c20269f399e6a4b0c45c119223b24b338 Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sat, 27 Feb 2016 18:57:45 -0500 Subject: [PATCH 01/12] Use Postgresql to push/pop log files to be processed. SQS delivers the messages as at-least-one, so we need to make sure our job is idempotent. Previous implentation was rely on Redis for that, however there was a TTL on it, which could cause problems if a message is delivered again after the TTL. Besides we would like to remove Redis from our stack, for a few other resons. --- app/jobs/fastly_log_processor.rb | 43 ++++++------------- app/models/log_ticket.rb | 27 ++++++++++++ .../20160227194735_create_log_tickets.rb | 14 ++++++ db/schema.rb | 13 +++++- lib/rubygem_fs.rb | 9 +++- lib/shoryuken/sqs_worker.rb | 9 ++-- test/unit/fastly_log_processor_test.rb | 33 +++----------- 7 files changed, 86 insertions(+), 62 deletions(-) create mode 100644 app/models/log_ticket.rb create mode 100644 db/migrate/20160227194735_create_log_tickets.rb diff --git a/app/jobs/fastly_log_processor.rb b/app/jobs/fastly_log_processor.rb index 59bdad93ccf..4e68c541f39 100644 --- a/app/jobs/fastly_log_processor.rb +++ b/app/jobs/fastly_log_processor.rb @@ -11,28 +11,26 @@ def initialize(bucket, key) end def perform - counts = download_counts + StatsD.increment('fastly_log_processor.processed') + + log_ticket = LogTicket.pop(key, bucket) + if log_ticket.nil? + StatsD.increment('fastly_log_processor.extra') + return + end + + counts = download_counts(log_ticket) # Temporary feature flag while we roll out fastly log processing if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] != 'true' # Just log & exit w/out updating stats Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}" - StatsD.increment('fastly_log_processor.processed') + StatsD.increment('fastly_log_processor.disabled') return end - # Check if this log has already been processed by another job - unless Redis.current.setnx(redis_key, 'processing') - raise FastlyLogProcessor::AlreadyProcessedError, "Already processed bucket: #{bucket} key: #{key}" - end - - # Set a short expiry while updating - Redis.current.expire(redis_key, 2.minutes) - Download.bulk_update(munge_for_bulk_update(counts)) - - Redis.current.set(redis_key, 'processed') - Redis.current.expire(redis_key, 30.days) + log_ticket.update(status: "processed") end # Takes an enumerator of log lines and returns a hash of download counts @@ -41,7 +39,9 @@ def perform # 'rails-4.0.0' => 25, # 'rails-4.2.0' => 50 # } - def download_counts(enumerator = log_lines) + def download_counts(log_ticket) + enumerator = log_ticket.filesystem.get(key).each_line + enumerator.each_with_object(Hash.new(0)) do |log_line, accum| path, response_code = log_line.split[10, 2] # Only count successful downloads @@ -67,19 +67,4 @@ def munge_for_bulk_update(download_counts) name ? [name, path, count] : nil end.compact end - - def log_lines - s3_body.each_line - end - - def s3_body - # TODO: Are rubygems' fastly logs gzipped? - io = Aws::S3::Object.new(bucket_name: bucket, key: key).get.body - io = Zlib::GzipReader.wrap(io) if key.end_with?('.gz') - io - end - - def redis_key - "fastly-log:#{bucket}:#{key}" - end end diff --git a/app/models/log_ticket.rb b/app/models/log_ticket.rb new file mode 100644 index 00000000000..a2d3282007c --- /dev/null +++ b/app/models/log_ticket.rb @@ -0,0 +1,27 @@ +class LogTicket < ActiveRecord::Base + enum backend: [ :s3, :local ] + + scope :latest_pending, -> { limit(1).lock(true).select("id").where(status: "pending") } + + def self.pop(key = nil, directory = nil) + scope = latest_pending + scope = scope.where(key: key) if key + scope = scope.where(directory: directory) if directory + sql = scope.to_sql + + find_by_sql(["UPDATE #{quoted_table_name} SET status = ? WHERE id IN (#{sql}) RETURNING *", 'processing']).first + end + + def filesystem + @fs ||= + if s3? + RubygemFs::S3.new(access_key_id: ENV['S3_KEY'], + secret_access_key: ENV['S3_SECRET'], + region: Gemcutter.config['s3_region'], + endpoint: "https://#{Gemcutter.config['s3_endpoint']}", + bucket: directory) + else + RubygemFs::Local.new(directory) + end + end +end diff --git a/db/migrate/20160227194735_create_log_tickets.rb b/db/migrate/20160227194735_create_log_tickets.rb new file mode 100644 index 00000000000..39724a85b54 --- /dev/null +++ b/db/migrate/20160227194735_create_log_tickets.rb @@ -0,0 +1,14 @@ +class CreateLogTickets < ActiveRecord::Migration + def change + create_table :log_tickets do |t| + t.string :key + t.string :directory + t.integer :backend, default: 0 + t.string :status + + t.timestamps null: false + end + + add_index :log_tickets, [:directory, :key], unique: true + end +end diff --git a/db/schema.rb b/db/schema.rb index 2711384e14b..67d6ab25cad 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -11,7 +11,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 20150709170542) do +ActiveRecord::Schema.define(version: 20160227194735) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -76,6 +76,17 @@ add_index "linksets", ["rubygem_id"], name: "index_linksets_on_rubygem_id", using: :btree + create_table "log_tickets", force: :cascade do |t| + t.string "key" + t.string "directory" + t.integer "backend", default: 0 + t.string "status" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + add_index "log_tickets", ["directory", "key"], name: "index_log_tickets_on_directory_and_key", unique: true, using: :btree + create_table "oauth_access_grants", force: :cascade do |t| t.integer "resource_owner_id", null: false t.integer "application_id", null: false diff --git a/lib/rubygem_fs.rb b/lib/rubygem_fs.rb index 946d35cb1bd..056072fde64 100644 --- a/lib/rubygem_fs.rb +++ b/lib/rubygem_fs.rb @@ -31,6 +31,10 @@ def self.s3!(host) end class Local + def initialize(base_dir = nil) + @base_dir = base_dir + end + def store(key, body, _metadata = {}) FileUtils.mkdir_p File.dirname("#{base_dir}/#{key}") File.open("#{base_dir}/#{key}", 'wb') do |f| @@ -51,12 +55,13 @@ def remove(key) end def base_dir - Rails.root.join('server') + @base_dir || Rails.root.join('server') end end class S3 def initialize(config) + @bucket = config.delete(:bucket) @config = config end @@ -84,7 +89,7 @@ def restore(key) private def bucket - Gemcutter.config['s3_bucket'] + @bucket || Gemcutter.config['s3_bucket'] end def s3 diff --git a/lib/shoryuken/sqs_worker.rb b/lib/shoryuken/sqs_worker.rb index 0cd0ab858bc..473214c9d98 100644 --- a/lib/shoryuken/sqs_worker.rb +++ b/lib/shoryuken/sqs_worker.rb @@ -15,10 +15,13 @@ def perform(_sqs_msg, body) StatsD.increment('fastly_log_processor.s3_entry_fetched') - ActiveRecord::Base.transaction do - s3_objects.each do |bucket, key| - StatsD.increment('fastly_log_processor.enqueued') + s3_objects.each do |bucket, key| + StatsD.increment('fastly_log_processor.enqueued') + begin + LogTicket.create!(backend: "s3", key: key, directory: bucket, status: "pending") Delayed::Job.enqueue FastlyLogProcessor.new(bucket, key), priority: PRIORITIES[:stats] + rescue ActiveRecord::RecordNotUnique + StatsD.increment('fastly_log_processor.duplicated') end end end diff --git a/test/unit/fastly_log_processor_test.rb b/test/unit/fastly_log_processor_test.rb index d50ebb5bbcf..637e47d1fd4 100644 --- a/test/unit/fastly_log_processor_test.rb +++ b/test/unit/fastly_log_processor_test.rb @@ -15,6 +15,7 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase "json-1.8.2" => 4, "no-such-gem-1.2.3" => 1 } + @log_ticket = LogTicket.create!(backend: 's3', directory: 'test-bucket', key: 'fastly-fake.log', status: "pending") Aws.config[:s3] = { stub_responses: { get_object: { body: @sample_log } } @@ -28,25 +29,9 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase ENV['FASTLY_LOG_PROCESSOR_ENABLED'] = @orig_fastly_log_processor_enabled end - context "#s3_body" do - should "return a readable object " do - assert @job.s3_body.respond_to?(:read) - end - end - - context "#log_lines" do - should "return an enumerator" do - assert_kind_of Enumerator, @job.log_lines - end - - should "have values" do - assert @job.log_lines.first - end - end - context "#download_counts" do should "be correct" do - assert_equal @sample_log_counts, @job.download_counts + assert_equal @sample_log_counts, @job.download_counts(@log_ticket) end end @@ -79,25 +64,19 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase context '#perform' do should "update download counts" do @job.perform - @sample_log_counts .reject { |k, _| k == "no-such-gem-1.2.3" } .each do |name, expected_count| - assert_equal expected_count, Version.find_by_full_name(name).downloads_count + assert_equal expected_count, Version.find_by_full_name(name).downloads_count, "invalid value for #{name}" end assert_equal 7, Rubygem.find_by_name('json').downloads - end - - should 'set the redis key' do - @job.perform - assert_equal 'processed', Redis.current.get(@job.redis_key) - assert_in_delta Redis.current.ttl(@job.redis_key), 30.days, 10 + assert_equal "processed", @log_ticket.reload.status end should "fail if already run" do - Redis.current.set(@job.redis_key, 'processed') - assert_raises(FastlyLogProcessor::AlreadyProcessedError) { @job.perform } + @log_ticket.update(status: 'processed') + @job.perform end end end From 8241e3fe352cd2affe58730b8421a1833ae607a5 Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 09:45:39 -0500 Subject: [PATCH 02/12] fix rubocop --- app/models/log_ticket.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/log_ticket.rb b/app/models/log_ticket.rb index a2d3282007c..d655397774a 100644 --- a/app/models/log_ticket.rb +++ b/app/models/log_ticket.rb @@ -1,5 +1,5 @@ class LogTicket < ActiveRecord::Base - enum backend: [ :s3, :local ] + enum backend: [:s3, :local] scope :latest_pending, -> { limit(1).lock(true).select("id").where(status: "pending") } From 4daaadd77c9ad22206d3d357551e22a109624a44 Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 11:50:21 -0500 Subject: [PATCH 03/12] Add order for pending tickets --- app/models/log_ticket.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/models/log_ticket.rb b/app/models/log_ticket.rb index d655397774a..96f058d4509 100644 --- a/app/models/log_ticket.rb +++ b/app/models/log_ticket.rb @@ -1,10 +1,10 @@ class LogTicket < ActiveRecord::Base enum backend: [:s3, :local] - scope :latest_pending, -> { limit(1).lock(true).select("id").where(status: "pending") } + scope :pending, -> { limit(1).lock(true).select("id").where(status: "pending").order("id ASC") } def self.pop(key = nil, directory = nil) - scope = latest_pending + scope = pending scope = scope.where(key: key) if key scope = scope.where(directory: directory) if directory sql = scope.to_sql From abdae3d150447a7d9db06cee1ccadd612a1fdabe Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 11:50:46 -0500 Subject: [PATCH 04/12] Raise if file not found on bucket or something --- app/jobs/fastly_log_processor.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/app/jobs/fastly_log_processor.rb b/app/jobs/fastly_log_processor.rb index 4e68c541f39..dd74582bbef 100644 --- a/app/jobs/fastly_log_processor.rb +++ b/app/jobs/fastly_log_processor.rb @@ -1,7 +1,7 @@ require 'zlib' class FastlyLogProcessor - class AlreadyProcessedError < ::StandardError; end + class LogFileNotFoundError < ::StandardError; end attr_accessor :bucket, :key @@ -29,6 +29,7 @@ def perform return end + # TODO: wrap this in a transation when download update is in the DB Download.bulk_update(munge_for_bulk_update(counts)) log_ticket.update(status: "processed") end @@ -40,7 +41,9 @@ def perform # 'rails-4.2.0' => 50 # } def download_counts(log_ticket) - enumerator = log_ticket.filesystem.get(key).each_line + file = log_ticket.filesystem.get(key) + raise LogFileNotFoundError if file.nil? + enumerator = file.each_line enumerator.each_with_object(Hash.new(0)) do |log_line, accum| path, response_code = log_line.split[10, 2] From 93a3cb05d5a716e5bd0dd7a75177e1df58f03f4e Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 11:50:59 -0500 Subject: [PATCH 05/12] Add more tests --- test/unit/fastly_log_processor_test.rb | 52 +++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/test/unit/fastly_log_processor_test.rb b/test/unit/fastly_log_processor_test.rb index 637e47d1fd4..faa2c89dbea 100644 --- a/test/unit/fastly_log_processor_test.rb +++ b/test/unit/fastly_log_processor_test.rb @@ -30,9 +30,21 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase end context "#download_counts" do - should "be correct" do + should "process file from s3" do assert_equal @sample_log_counts, @job.download_counts(@log_ticket) end + + should "process file from local fs" do + @log_ticket.update(backend: 'local', directory: 'test/sample_logs') + assert_equal @sample_log_counts, @job.download_counts(@log_ticket) + end + + should "fail if dont find the file" do + @log_ticket.update(backend: 'local', directory: 'foobar') + assert_raises FastlyLogProcessor::LogFileNotFoundError do + @job.download_counts(@log_ticket) + end + end end context "with gem data" do @@ -62,6 +74,12 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase end context '#perform' do + should "not double count" do + assert_equal 0, Rubygem.find_by_name('json').downloads + 3.times { @job.perform } + assert_equal 7, Rubygem.find_by_name('json').downloads + end + should "update download counts" do @job.perform @sample_log_counts @@ -74,9 +92,39 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase assert_equal "processed", @log_ticket.reload.status end - should "fail if already run" do + should "not run if already processed" do + assert_equal 0, Rubygem.find_by_name('json').downloads @log_ticket.update(status: 'processed') @job.perform + + assert_equal 0, Rubygem.find_by_name('json').downloads + end + + should "not mark as processed if anything fails" do + def @job.download_counts(_) + raise "woops" + end + assert_raises { @job.perform } + refute_equal "processed", @log_ticket.reload.status + end + + should "not re-process if it failed" do + def @job.download_counts(_) + raise "woops" + end + assert_raises { @job.perform } + + @job = FastlyLogProcessor.new('test-bucket', 'fastly-fake.log') + @job.perform + assert_equal 0, Rubygem.find_by_name('json').downloads + end + + should "only process the right file" do + ticket = LogTicket.create!(backend: 's3', directory: 'test-bucket', key: 'fastly-fake.2.log', status: "pending") + + @job.perform + assert_equal "pending", ticket.reload.status + assert_equal "processed", @log_ticket.reload.status end end end From e809c5f7500d0059ce74ab46999f19c824fc3339 Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 12:33:22 -0500 Subject: [PATCH 06/12] Better pop API --- app/jobs/fastly_log_processor.rb | 2 +- app/models/log_ticket.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/jobs/fastly_log_processor.rb b/app/jobs/fastly_log_processor.rb index dd74582bbef..6827c7df4c3 100644 --- a/app/jobs/fastly_log_processor.rb +++ b/app/jobs/fastly_log_processor.rb @@ -13,7 +13,7 @@ def initialize(bucket, key) def perform StatsD.increment('fastly_log_processor.processed') - log_ticket = LogTicket.pop(key, bucket) + log_ticket = LogTicket.pop(key: key, directory: bucket) if log_ticket.nil? StatsD.increment('fastly_log_processor.extra') return diff --git a/app/models/log_ticket.rb b/app/models/log_ticket.rb index 96f058d4509..7870a25ff69 100644 --- a/app/models/log_ticket.rb +++ b/app/models/log_ticket.rb @@ -3,7 +3,7 @@ class LogTicket < ActiveRecord::Base scope :pending, -> { limit(1).lock(true).select("id").where(status: "pending").order("id ASC") } - def self.pop(key = nil, directory = nil) + def self.pop(key: nil, directory: nil) scope = pending scope = scope.where(key: key) if key scope = scope.where(directory: directory) if directory From a01c4b04613cb3e8c0479e93bddc528fa1f7d339 Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 12:33:33 -0500 Subject: [PATCH 07/12] Add LogTicket unit tests --- test/unit/log_ticket_test.rb | 89 ++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 test/unit/log_ticket_test.rb diff --git a/test/unit/log_ticket_test.rb b/test/unit/log_ticket_test.rb new file mode 100644 index 00000000000..b1092f65e1e --- /dev/null +++ b/test/unit/log_ticket_test.rb @@ -0,0 +1,89 @@ +require 'test_helper' + +class LogTicketTest < ActiveSupport::TestCase + + setup do + @log_ticket = LogTicket.create!(directory: "test", key: "foo", status: "pending") + end + + should "not allow duplicate directory and key" do + assert_raise ActiveRecord::RecordNotUnique do + LogTicket.create!(directory: "test", key: "foo") + end + end + + should "allow different keys for the same directory" do + LogTicket.create!(directory: "test", key: "bar") + LogTicket.create!(directory: "test", key: "baz") + end + + context "#pop" do + setup do + LogTicket.create!(directory: "test/2", key: "bar", status: "pending") + end + + context "without any key" do + should "pop the first inputed" do + ticket = LogTicket.pop + assert_equal "foo", ticket.key + end + end + + context "with a key" do + should "pop the key" do + ticket = LogTicket.pop(key: "bar") + assert_equal "bar", ticket.key + end + end + + context "with a directory" do + should "pop the first entry" do + LogTicket.create!(directory: "test/2", key: "boo", status: "pending") + + ticket = LogTicket.pop(directory: "test/2") + assert_equal "bar", ticket.key + end + end + + should "change the status" do + ticket = LogTicket.pop + assert_equal "processing", ticket.status + end + + should "return nil after no more items are in the queue" do + 2.times { LogTicket.pop } + assert_nil LogTicket.pop + end + end + + context "filesystem" do + + context "local" do + setup do + @log_ticket.update!(backend: "local") + end + + should "return a local fs" do + assert_kind_of RubygemFs::Local, @log_ticket.filesystem + end + + should "set the right base directory" do + assert_equal "test", @log_ticket.filesystem.send(:base_dir) + end + end + + context "s3" do + setup do + @log_ticket.update!(backend: "s3") + end + + should "return a s3 fs" do + assert_kind_of RubygemFs::S3, @log_ticket.filesystem + end + + should "set the right bucket" do + assert_equal "test", @log_ticket.filesystem.send(:bucket) + end + end + end +end From 07ee0f3ca336298236489ca2ef13fdb80bbdc62c Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 12:38:19 -0500 Subject: [PATCH 08/12] remove extra lines --- test/unit/log_ticket_test.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/unit/log_ticket_test.rb b/test/unit/log_ticket_test.rb index b1092f65e1e..b3916499d82 100644 --- a/test/unit/log_ticket_test.rb +++ b/test/unit/log_ticket_test.rb @@ -1,7 +1,6 @@ require 'test_helper' class LogTicketTest < ActiveSupport::TestCase - setup do @log_ticket = LogTicket.create!(directory: "test", key: "foo", status: "pending") end @@ -57,7 +56,6 @@ class LogTicketTest < ActiveSupport::TestCase end context "filesystem" do - context "local" do setup do @log_ticket.update!(backend: "local") From 346d21788f62bc70295c73ff9672bf325a396acc Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 12:53:24 -0500 Subject: [PATCH 09/12] Move S3 config to rubygems_fs wrapper Also add more tests --- app/models/log_ticket.rb | 6 +----- lib/rubygem_fs.rb | 24 +++++++++++------------- test/unit/log_ticket_test.rb | 4 ++-- test/unit/rubygem_fs_test.rb | 25 +++++++++++++++++++++---- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/app/models/log_ticket.rb b/app/models/log_ticket.rb index 7870a25ff69..cd38f53cd6c 100644 --- a/app/models/log_ticket.rb +++ b/app/models/log_ticket.rb @@ -15,11 +15,7 @@ def self.pop(key: nil, directory: nil) def filesystem @fs ||= if s3? - RubygemFs::S3.new(access_key_id: ENV['S3_KEY'], - secret_access_key: ENV['S3_SECRET'], - region: Gemcutter.config['s3_region'], - endpoint: "https://#{Gemcutter.config['s3_endpoint']}", - bucket: directory) + RubygemFs::S3.new(bucket: directory) else RubygemFs::Local.new(directory) end diff --git a/lib/rubygem_fs.rb b/lib/rubygem_fs.rb index 056072fde64..39f17361504 100644 --- a/lib/rubygem_fs.rb +++ b/lib/rubygem_fs.rb @@ -4,24 +4,17 @@ def self.instance if Rails.env.development? RubygemFs::Local.new else - RubygemFs::S3.new(access_key_id: ENV['S3_KEY'], - secret_access_key: ENV['S3_SECRET'], - region: Gemcutter.config['s3_region'], - endpoint: "https://#{Gemcutter.config['s3_endpoint']}") + RubygemFs::S3.new end end def self.mock! - @fs = RubygemFs::Local.new - @fs.define_singleton_method(:base_dir) do - @dir ||= Dir.mktmpdir - end + @fs = RubygemFs::Local.new(Dir.mktmpdir) end def self.s3!(host) @fs = RubygemFs::S3.new(access_key_id: 'k', secret_access_key: 's', - region: Gemcutter.config['s3_region'], endpoint: host, force_path_style: true) @fs.define_singleton_method(:init) do @@ -60,9 +53,14 @@ def base_dir end class S3 - def initialize(config) + def initialize(config = {}) @bucket = config.delete(:bucket) - @config = config + @config = { + access_key_id: ENV['S3_KEY'], + secret_access_key: ENV['S3_SECRET'], + region: Gemcutter.config['s3_region'], + endpoint: "https://#{Gemcutter.config['s3_endpoint']}" + }.merge(config) end def store(key, body, metadata = {}) @@ -86,12 +84,12 @@ def restore(key) s3.delete_object(key: key, bucket: bucket, version_id: version_id) end - private - def bucket @bucket || Gemcutter.config['s3_bucket'] end + private + def s3 @s3 ||= Aws::S3::Client.new(@config) end diff --git a/test/unit/log_ticket_test.rb b/test/unit/log_ticket_test.rb index b3916499d82..fdc888c5c32 100644 --- a/test/unit/log_ticket_test.rb +++ b/test/unit/log_ticket_test.rb @@ -66,7 +66,7 @@ class LogTicketTest < ActiveSupport::TestCase end should "set the right base directory" do - assert_equal "test", @log_ticket.filesystem.send(:base_dir) + assert_equal "test", @log_ticket.filesystem.base_dir end end @@ -80,7 +80,7 @@ class LogTicketTest < ActiveSupport::TestCase end should "set the right bucket" do - assert_equal "test", @log_ticket.filesystem.send(:bucket) + assert_equal "test", @log_ticket.filesystem.bucket end end end diff --git a/test/unit/rubygem_fs_test.rb b/test/unit/rubygem_fs_test.rb index aa844ea7679..e5b4b5655b9 100644 --- a/test/unit/rubygem_fs_test.rb +++ b/test/unit/rubygem_fs_test.rb @@ -1,12 +1,29 @@ require 'test_helper' class RubygemFsTest < ActiveSupport::TestCase + context "s3 filesystem" do + should "use default bucket when not passing as an argument" do + fs = RubygemFs::S3.new + assert_equal "test.s3.rubygems.org", fs.bucket + end + + should "use bucket passed" do + fs = RubygemFs::S3.new(bucket: "foo.com") + assert_equal "foo.com", fs.bucket + end + + should "use a custom config when passed" do + fs = RubygemFs::S3.new(access_key_id: 'foo', secret_access_key: 'bar') + def fs.s3 + [@config[:access_key_id], @config[:secret_access_key]] + end + assert_equal ['foo', 'bar'], fs.s3 + end + end + context "local filesystem" do setup do - @fs = RubygemFs::Local.new - def @fs.base_dir - @dir ||= Dir.mktmpdir - end + @fs = RubygemFs::Local.new(Dir.mktmpdir) end context "#get" do From c03988b4a962ce36426184f8d88a5d4e79937bbc Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 12:54:40 -0500 Subject: [PATCH 10/12] use %w instead of array --- test/unit/rubygem_fs_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/rubygem_fs_test.rb b/test/unit/rubygem_fs_test.rb index e5b4b5655b9..5312c9b48c8 100644 --- a/test/unit/rubygem_fs_test.rb +++ b/test/unit/rubygem_fs_test.rb @@ -17,7 +17,7 @@ class RubygemFsTest < ActiveSupport::TestCase def fs.s3 [@config[:access_key_id], @config[:secret_access_key]] end - assert_equal ['foo', 'bar'], fs.s3 + assert_equal %w(foo bar), fs.s3 end end From 4d634cf5fc7e033bf159b8400e061eef181f668a Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 13:04:41 -0500 Subject: [PATCH 11/12] still mark as processed even if feature flag disabled --- app/jobs/fastly_log_processor.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/app/jobs/fastly_log_processor.rb b/app/jobs/fastly_log_processor.rb index 6827c7df4c3..257e778a801 100644 --- a/app/jobs/fastly_log_processor.rb +++ b/app/jobs/fastly_log_processor.rb @@ -21,16 +21,16 @@ def perform counts = download_counts(log_ticket) + # TODO: wrap this in a transation when download update is in the DB + # Temporary feature flag while we roll out fastly log processing - if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] != 'true' + if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] == 'true' + Download.bulk_update(munge_for_bulk_update(counts)) + else # Just log & exit w/out updating stats Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}" StatsD.increment('fastly_log_processor.disabled') - return end - - # TODO: wrap this in a transation when download update is in the DB - Download.bulk_update(munge_for_bulk_update(counts)) log_ticket.update(status: "processed") end From 1b0a5f06e22bc4f0ede2eeb5ade340ee28579ddd Mon Sep 17 00:00:00 2001 From: Arthur Neves Date: Sun, 28 Feb 2016 13:22:39 -0500 Subject: [PATCH 12/12] Move logger to outside feature flag --- app/jobs/fastly_log_processor.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/jobs/fastly_log_processor.rb b/app/jobs/fastly_log_processor.rb index 257e778a801..59547bb25c2 100644 --- a/app/jobs/fastly_log_processor.rb +++ b/app/jobs/fastly_log_processor.rb @@ -23,12 +23,13 @@ def perform # TODO: wrap this in a transation when download update is in the DB + Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}" # Temporary feature flag while we roll out fastly log processing if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] == 'true' - Download.bulk_update(munge_for_bulk_update(counts)) + updates = munge_for_bulk_update(counts) + Download.bulk_update(updates) else # Just log & exit w/out updating stats - Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}" StatsD.increment('fastly_log_processor.disabled') end log_ticket.update(status: "processed")