diff --git a/app/jobs/fastly_log_processor.rb b/app/jobs/fastly_log_processor.rb index 59bdad93ccf..59547bb25c2 100644 --- a/app/jobs/fastly_log_processor.rb +++ b/app/jobs/fastly_log_processor.rb @@ -1,7 +1,7 @@ require 'zlib' class FastlyLogProcessor - class AlreadyProcessedError < ::StandardError; end + class LogFileNotFoundError < ::StandardError; end attr_accessor :bucket, :key @@ -11,28 +11,28 @@ def initialize(bucket, key) end def perform - counts = download_counts + StatsD.increment('fastly_log_processor.processed') - # Temporary feature flag while we roll out fastly log processing - if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] != 'true' - # Just log & exit w/out updating stats - Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}" - StatsD.increment('fastly_log_processor.processed') + log_ticket = LogTicket.pop(key: key, directory: bucket) + if log_ticket.nil? + StatsD.increment('fastly_log_processor.extra') return end - # Check if this log has already been processed by another job - unless Redis.current.setnx(redis_key, 'processing') - raise FastlyLogProcessor::AlreadyProcessedError, "Already processed bucket: #{bucket} key: #{key}" - end - - # Set a short expiry while updating - Redis.current.expire(redis_key, 2.minutes) + counts = download_counts(log_ticket) - Download.bulk_update(munge_for_bulk_update(counts)) + # TODO: wrap this in a transation when download update is in the DB - Redis.current.set(redis_key, 'processed') - Redis.current.expire(redis_key, 30.days) + Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}" + # Temporary feature flag while we roll out fastly log processing + if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] == 'true' + updates = munge_for_bulk_update(counts) + Download.bulk_update(updates) + else + # Just log & exit w/out updating stats + StatsD.increment('fastly_log_processor.disabled') + end + log_ticket.update(status: "processed") end # Takes an enumerator of log lines and returns a hash of download counts @@ -41,7 +41,11 @@ def perform # 'rails-4.0.0' => 25, # 'rails-4.2.0' => 50 # } - def download_counts(enumerator = log_lines) + def download_counts(log_ticket) + file = log_ticket.filesystem.get(key) + raise LogFileNotFoundError if file.nil? + enumerator = file.each_line + enumerator.each_with_object(Hash.new(0)) do |log_line, accum| path, response_code = log_line.split[10, 2] # Only count successful downloads @@ -67,19 +71,4 @@ def munge_for_bulk_update(download_counts) name ? [name, path, count] : nil end.compact end - - def log_lines - s3_body.each_line - end - - def s3_body - # TODO: Are rubygems' fastly logs gzipped? - io = Aws::S3::Object.new(bucket_name: bucket, key: key).get.body - io = Zlib::GzipReader.wrap(io) if key.end_with?('.gz') - io - end - - def redis_key - "fastly-log:#{bucket}:#{key}" - end end diff --git a/app/models/log_ticket.rb b/app/models/log_ticket.rb new file mode 100644 index 00000000000..cd38f53cd6c --- /dev/null +++ b/app/models/log_ticket.rb @@ -0,0 +1,23 @@ +class LogTicket < ActiveRecord::Base + enum backend: [:s3, :local] + + scope :pending, -> { limit(1).lock(true).select("id").where(status: "pending").order("id ASC") } + + def self.pop(key: nil, directory: nil) + scope = pending + scope = scope.where(key: key) if key + scope = scope.where(directory: directory) if directory + sql = scope.to_sql + + find_by_sql(["UPDATE #{quoted_table_name} SET status = ? WHERE id IN (#{sql}) RETURNING *", 'processing']).first + end + + def filesystem + @fs ||= + if s3? + RubygemFs::S3.new(bucket: directory) + else + RubygemFs::Local.new(directory) + end + end +end diff --git a/db/migrate/20160227194735_create_log_tickets.rb b/db/migrate/20160227194735_create_log_tickets.rb new file mode 100644 index 00000000000..39724a85b54 --- /dev/null +++ b/db/migrate/20160227194735_create_log_tickets.rb @@ -0,0 +1,14 @@ +class CreateLogTickets < ActiveRecord::Migration + def change + create_table :log_tickets do |t| + t.string :key + t.string :directory + t.integer :backend, default: 0 + t.string :status + + t.timestamps null: false + end + + add_index :log_tickets, [:directory, :key], unique: true + end +end diff --git a/db/schema.rb b/db/schema.rb index 2711384e14b..67d6ab25cad 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -11,7 +11,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 20150709170542) do +ActiveRecord::Schema.define(version: 20160227194735) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -76,6 +76,17 @@ add_index "linksets", ["rubygem_id"], name: "index_linksets_on_rubygem_id", using: :btree + create_table "log_tickets", force: :cascade do |t| + t.string "key" + t.string "directory" + t.integer "backend", default: 0 + t.string "status" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + add_index "log_tickets", ["directory", "key"], name: "index_log_tickets_on_directory_and_key", unique: true, using: :btree + create_table "oauth_access_grants", force: :cascade do |t| t.integer "resource_owner_id", null: false t.integer "application_id", null: false diff --git a/lib/rubygem_fs.rb b/lib/rubygem_fs.rb index 946d35cb1bd..39f17361504 100644 --- a/lib/rubygem_fs.rb +++ b/lib/rubygem_fs.rb @@ -4,24 +4,17 @@ def self.instance if Rails.env.development? RubygemFs::Local.new else - RubygemFs::S3.new(access_key_id: ENV['S3_KEY'], - secret_access_key: ENV['S3_SECRET'], - region: Gemcutter.config['s3_region'], - endpoint: "https://#{Gemcutter.config['s3_endpoint']}") + RubygemFs::S3.new end end def self.mock! - @fs = RubygemFs::Local.new - @fs.define_singleton_method(:base_dir) do - @dir ||= Dir.mktmpdir - end + @fs = RubygemFs::Local.new(Dir.mktmpdir) end def self.s3!(host) @fs = RubygemFs::S3.new(access_key_id: 'k', secret_access_key: 's', - region: Gemcutter.config['s3_region'], endpoint: host, force_path_style: true) @fs.define_singleton_method(:init) do @@ -31,6 +24,10 @@ def self.s3!(host) end class Local + def initialize(base_dir = nil) + @base_dir = base_dir + end + def store(key, body, _metadata = {}) FileUtils.mkdir_p File.dirname("#{base_dir}/#{key}") File.open("#{base_dir}/#{key}", 'wb') do |f| @@ -51,13 +48,19 @@ def remove(key) end def base_dir - Rails.root.join('server') + @base_dir || Rails.root.join('server') end end class S3 - def initialize(config) - @config = config + def initialize(config = {}) + @bucket = config.delete(:bucket) + @config = { + access_key_id: ENV['S3_KEY'], + secret_access_key: ENV['S3_SECRET'], + region: Gemcutter.config['s3_region'], + endpoint: "https://#{Gemcutter.config['s3_endpoint']}" + }.merge(config) end def store(key, body, metadata = {}) @@ -81,12 +84,12 @@ def restore(key) s3.delete_object(key: key, bucket: bucket, version_id: version_id) end - private - def bucket - Gemcutter.config['s3_bucket'] + @bucket || Gemcutter.config['s3_bucket'] end + private + def s3 @s3 ||= Aws::S3::Client.new(@config) end diff --git a/lib/shoryuken/sqs_worker.rb b/lib/shoryuken/sqs_worker.rb index 0cd0ab858bc..473214c9d98 100644 --- a/lib/shoryuken/sqs_worker.rb +++ b/lib/shoryuken/sqs_worker.rb @@ -15,10 +15,13 @@ def perform(_sqs_msg, body) StatsD.increment('fastly_log_processor.s3_entry_fetched') - ActiveRecord::Base.transaction do - s3_objects.each do |bucket, key| - StatsD.increment('fastly_log_processor.enqueued') + s3_objects.each do |bucket, key| + StatsD.increment('fastly_log_processor.enqueued') + begin + LogTicket.create!(backend: "s3", key: key, directory: bucket, status: "pending") Delayed::Job.enqueue FastlyLogProcessor.new(bucket, key), priority: PRIORITIES[:stats] + rescue ActiveRecord::RecordNotUnique + StatsD.increment('fastly_log_processor.duplicated') end end end diff --git a/test/unit/fastly_log_processor_test.rb b/test/unit/fastly_log_processor_test.rb index d50ebb5bbcf..faa2c89dbea 100644 --- a/test/unit/fastly_log_processor_test.rb +++ b/test/unit/fastly_log_processor_test.rb @@ -15,6 +15,7 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase "json-1.8.2" => 4, "no-such-gem-1.2.3" => 1 } + @log_ticket = LogTicket.create!(backend: 's3', directory: 'test-bucket', key: 'fastly-fake.log', status: "pending") Aws.config[:s3] = { stub_responses: { get_object: { body: @sample_log } } @@ -28,25 +29,21 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase ENV['FASTLY_LOG_PROCESSOR_ENABLED'] = @orig_fastly_log_processor_enabled end - context "#s3_body" do - should "return a readable object " do - assert @job.s3_body.respond_to?(:read) - end - end - - context "#log_lines" do - should "return an enumerator" do - assert_kind_of Enumerator, @job.log_lines + context "#download_counts" do + should "process file from s3" do + assert_equal @sample_log_counts, @job.download_counts(@log_ticket) end - should "have values" do - assert @job.log_lines.first + should "process file from local fs" do + @log_ticket.update(backend: 'local', directory: 'test/sample_logs') + assert_equal @sample_log_counts, @job.download_counts(@log_ticket) end - end - context "#download_counts" do - should "be correct" do - assert_equal @sample_log_counts, @job.download_counts + should "fail if dont find the file" do + @log_ticket.update(backend: 'local', directory: 'foobar') + assert_raises FastlyLogProcessor::LogFileNotFoundError do + @job.download_counts(@log_ticket) + end end end @@ -77,27 +74,57 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase end context '#perform' do + should "not double count" do + assert_equal 0, Rubygem.find_by_name('json').downloads + 3.times { @job.perform } + assert_equal 7, Rubygem.find_by_name('json').downloads + end + should "update download counts" do @job.perform - @sample_log_counts .reject { |k, _| k == "no-such-gem-1.2.3" } .each do |name, expected_count| - assert_equal expected_count, Version.find_by_full_name(name).downloads_count + assert_equal expected_count, Version.find_by_full_name(name).downloads_count, "invalid value for #{name}" end assert_equal 7, Rubygem.find_by_name('json').downloads + assert_equal "processed", @log_ticket.reload.status end - should 'set the redis key' do + should "not run if already processed" do + assert_equal 0, Rubygem.find_by_name('json').downloads + @log_ticket.update(status: 'processed') @job.perform - assert_equal 'processed', Redis.current.get(@job.redis_key) - assert_in_delta Redis.current.ttl(@job.redis_key), 30.days, 10 + + assert_equal 0, Rubygem.find_by_name('json').downloads + end + + should "not mark as processed if anything fails" do + def @job.download_counts(_) + raise "woops" + end + assert_raises { @job.perform } + refute_equal "processed", @log_ticket.reload.status end - should "fail if already run" do - Redis.current.set(@job.redis_key, 'processed') - assert_raises(FastlyLogProcessor::AlreadyProcessedError) { @job.perform } + should "not re-process if it failed" do + def @job.download_counts(_) + raise "woops" + end + assert_raises { @job.perform } + + @job = FastlyLogProcessor.new('test-bucket', 'fastly-fake.log') + @job.perform + assert_equal 0, Rubygem.find_by_name('json').downloads + end + + should "only process the right file" do + ticket = LogTicket.create!(backend: 's3', directory: 'test-bucket', key: 'fastly-fake.2.log', status: "pending") + + @job.perform + assert_equal "pending", ticket.reload.status + assert_equal "processed", @log_ticket.reload.status end end end diff --git a/test/unit/log_ticket_test.rb b/test/unit/log_ticket_test.rb new file mode 100644 index 00000000000..fdc888c5c32 --- /dev/null +++ b/test/unit/log_ticket_test.rb @@ -0,0 +1,87 @@ +require 'test_helper' + +class LogTicketTest < ActiveSupport::TestCase + setup do + @log_ticket = LogTicket.create!(directory: "test", key: "foo", status: "pending") + end + + should "not allow duplicate directory and key" do + assert_raise ActiveRecord::RecordNotUnique do + LogTicket.create!(directory: "test", key: "foo") + end + end + + should "allow different keys for the same directory" do + LogTicket.create!(directory: "test", key: "bar") + LogTicket.create!(directory: "test", key: "baz") + end + + context "#pop" do + setup do + LogTicket.create!(directory: "test/2", key: "bar", status: "pending") + end + + context "without any key" do + should "pop the first inputed" do + ticket = LogTicket.pop + assert_equal "foo", ticket.key + end + end + + context "with a key" do + should "pop the key" do + ticket = LogTicket.pop(key: "bar") + assert_equal "bar", ticket.key + end + end + + context "with a directory" do + should "pop the first entry" do + LogTicket.create!(directory: "test/2", key: "boo", status: "pending") + + ticket = LogTicket.pop(directory: "test/2") + assert_equal "bar", ticket.key + end + end + + should "change the status" do + ticket = LogTicket.pop + assert_equal "processing", ticket.status + end + + should "return nil after no more items are in the queue" do + 2.times { LogTicket.pop } + assert_nil LogTicket.pop + end + end + + context "filesystem" do + context "local" do + setup do + @log_ticket.update!(backend: "local") + end + + should "return a local fs" do + assert_kind_of RubygemFs::Local, @log_ticket.filesystem + end + + should "set the right base directory" do + assert_equal "test", @log_ticket.filesystem.base_dir + end + end + + context "s3" do + setup do + @log_ticket.update!(backend: "s3") + end + + should "return a s3 fs" do + assert_kind_of RubygemFs::S3, @log_ticket.filesystem + end + + should "set the right bucket" do + assert_equal "test", @log_ticket.filesystem.bucket + end + end + end +end diff --git a/test/unit/rubygem_fs_test.rb b/test/unit/rubygem_fs_test.rb index aa844ea7679..5312c9b48c8 100644 --- a/test/unit/rubygem_fs_test.rb +++ b/test/unit/rubygem_fs_test.rb @@ -1,12 +1,29 @@ require 'test_helper' class RubygemFsTest < ActiveSupport::TestCase + context "s3 filesystem" do + should "use default bucket when not passing as an argument" do + fs = RubygemFs::S3.new + assert_equal "test.s3.rubygems.org", fs.bucket + end + + should "use bucket passed" do + fs = RubygemFs::S3.new(bucket: "foo.com") + assert_equal "foo.com", fs.bucket + end + + should "use a custom config when passed" do + fs = RubygemFs::S3.new(access_key_id: 'foo', secret_access_key: 'bar') + def fs.s3 + [@config[:access_key_id], @config[:secret_access_key]] + end + assert_equal %w(foo bar), fs.s3 + end + end + context "local filesystem" do setup do - @fs = RubygemFs::Local.new - def @fs.base_dir - @dir ||= Dir.mktmpdir - end + @fs = RubygemFs::Local.new(Dir.mktmpdir) end context "#get" do